From cdc1facf398e6d99cc746b70350ff754e2a975ad Mon Sep 17 00:00:00 2001 From: joewitt Date: Tue, 5 Dec 2017 14:44:07 -0500 Subject: [PATCH] NIFI-4664, NIFI-4662, NIFI-4660, NIFI-4659 moved tests which are timing/threading/network dependent and brittle to integration tests and un-ignored tests that are IT. Updated travis to reduce impact on infra and appveyor now skips test runs so is just to prove build works on windows. This closes #2319 squash --- .travis.yml | 4 +- appveyor.yml | 4 +- .../util/list/ITAbstractListProcessor.java | 426 ++++++++++++++++++ .../util/list/TestAbstractListProcessor.java | 353 +-------------- .../kafka/pubsub/ConsumeKafkaTest.java | 98 ---- .../kafka/pubsub/ITConsumeKafka.java | 135 ++++++ .../kafka/pubsub/ConsumeKafkaTest.java | 98 ---- .../kafka/pubsub/ITConsumeKafka.java | 135 ++++++ .../kafka/pubsub/ConsumeKafkaTest.java | 110 ----- .../kafka/pubsub/ITConsumeKafka.java | 150 ++++++ .../kafka/pubsub/ConsumeKafkaTest.java | 98 ---- .../kafka/pubsub/ITConsumeKafka.java | 135 ++++++ ... => ITLumberjackSocketChannelHandler.java} | 2 +- ...est.groovy => ITListenSyslogGroovy.groovy} | 2 +- .../processors/standard/ITListenSyslog.java | 402 +++++++++++++++++ .../processors/standard/TestListenSyslog.java | 356 +-------------- pom.xml | 4 +- 17 files changed, 1401 insertions(+), 1111 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java rename nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/{TestLumberjackSocketChannelHandler.java => ITLumberjackSocketChannelHandler.java} (99%) rename nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/{ListenSyslogGroovyTest.groovy => ITListenSyslogGroovy.groovy} (96%) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java diff --git a/.travis.yml b/.travis.yml index d7f0f039ec3c..e5b0ccbe0e8f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,8 +19,6 @@ env: - USER_LANGUAGE=en USER_REGION=US' - USER_LANGUAGE=fr USER_REGION=FR' - USER_LANGUAGE=ja USER_REGION=JP' - - USER_LANGUAGE=pt USER_REGION=BR' - - USER_LANGUAGE=default USER_REGION=default os: - linux @@ -54,4 +52,4 @@ install: # Note: The reason the sed is done as part of script is to ensure the pom hack # won't affect the 'clean install' above - bash .travis.sh - - mvn -T 2C -Pcontrib-check -Ddir-only clean install \ No newline at end of file + - mvn -T 2C -Pcontrib-check -Ddir-only clean install diff --git a/appveyor.yml b/appveyor.yml index c7aa2f2c55f1..3e31c191cead 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -32,9 +32,7 @@ install: - cmd: SET MAVEN_OPTS=-XX:MaxPermSize=2g -Xmx4g - cmd: SET JAVA_OPTS=-XX:MaxPermSize=2g -Xmx4g build_script: - - mvn -q clean package --batch-mode -DskipTests -test_script: - - mvn -q clean install --batch-mode -Pcontrib-check + - mvn clean package --batch-mode -DskipTests -Ddir-only cache: - C:\maven\ - C:\Users\appveyor\.m2 diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java new file mode 100644 index 000000000000..dcf47c6ac9fb --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestWatcher; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION; +import org.apache.nifi.processor.util.list.TestAbstractListProcessor.ConcreteListProcessor; +import org.apache.nifi.processor.util.list.TestAbstractListProcessor.DistributedCache; +import static org.junit.Assert.assertEquals; + +public class ITAbstractListProcessor { + + /** + * @return current timestamp in milliseconds, but truncated at specified + * target precision (e.g. SECONDS or MINUTES). + */ + private static long getCurrentTimestampMillis(final TimeUnit targetPrecision) { + final long timestampInTargetPrecision = targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, targetPrecision); + } + + private static long getSleepMillis(final TimeUnit targetPrecision) { + return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 2; + } + + private static final long DEFAULT_SLEEP_MILLIS = getSleepMillis(TimeUnit.MILLISECONDS); + + private ConcreteListProcessor proc; + private TestRunner runner; + + @Rule + public TestWatcher dumpState = new ListProcessorTestWatcher( + () -> { + try { + return runner.getStateManager().getState(Scope.LOCAL).toMap(); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve state", e); + } + }, + () -> proc.entities, + () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList()) + ); + + @Before + public void setup() { + proc = new ConcreteListProcessor(); + runner = TestRunners.newTestRunner(proc); + } + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(); + + /** + *

+ * Ensures that files are listed when those are old enough: + *

  • Files with last modified timestamp those are old enough to determine + * that those are completely written and no further files are expected to be + * added with the same timestamp.
  • + *
  • This behavior is expected when a processor is scheduled less + * frequently, such as hourly or daily.
  • + *

    + */ + @Test + public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception { + final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS); + + // These entries have existed before the processor runs at the first time. + proc.addEntity("name", "id", oldTimestamp); + proc.addEntity("name", "id2", oldTimestamp); + + // First run, the above listed entries should be emitted since it has existed. + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + // Run again without introducing any new entries + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + } + + private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException { + runner.run(); + + final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); + + setTargetSystemTimestampPrecision(targetPrecision); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + + // First run, the above listed entries would be skipped to avoid write synchronization issues + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(getSleepMillis(targetPrecision)); + + // Run again without introducing any new entries + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + } + + /** + *

    + * Ensures that newly created files should wait to confirm there is no more + * files created with the same timestamp: + *

  • If files have the latest modified timestamp at an iteration, then + * those should be postponed to be listed
  • + *
  • If those files still are the latest files at the next iteration, then + * those should be listed
  • + *

    + */ + @Test + public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception { + testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS); + } + + /** + * Same as + * {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} + * but simulates that the target filesystem only provide timestamp precision + * in Seconds. + */ + @Test + public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception { + testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS); + } + + /** + * Same as + * {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} + * but simulates that the target filesystem only provide timestamp precision + * in Minutes. + */ + @Test + public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception { + testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES); + } + + private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException { + + final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); + + setTargetSystemTimestampPrecision(targetPrecision); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + + // First run, the above listed entries would be skipped to avoid write synchronization issues + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(getSleepMillis(targetPrecision)); + + // Running again, our two previously seen files are now cleared to be released + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Verify no new old files show up + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // An entry that is older than already processed entry should not be listed. + proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1)); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // If an entry whose timestamp is the same with the last processed timestamp should not be listed. + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Now a new file beyond the current time enters + proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1)); + + // It should show up + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) { + switch (targetPrecision) { + case MINUTES: + runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES); + break; + case SECONDS: + runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS); + break; + case MILLISECONDS: + runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS); + break; + } + } + + @Test + public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception { + testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS); + } + + @Test + public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception { + testOnlyNewEntriesEmitted(TimeUnit.SECONDS); + } + + @Test + public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception { + testOnlyNewEntriesEmitted(TimeUnit.MINUTES); + } + + @Test + public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception { + + final long initialTimestamp = System.currentTimeMillis(); + + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + + // Emulate having state but not having had the processor run such as in a restart + final Map preexistingState = new HashMap<>(); + preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); + preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); + preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); + preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2"); + runner.getStateManager().setState(preexistingState, Scope.CLUSTER); + + // run for the first time + runner.run(); + + // First run, the above listed entries would be skipped + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + // Running again, these files should be eligible for transfer and again skipped + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Verify no new old files show up + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id3", initialTimestamp - 1); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Now a new file beyond the current time enters + proc.addEntity("name", "id2", initialTimestamp + 1); + + // It should now show up + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + @Test + public void testStateStoredInClusterStateManagement() throws Exception { + + final DistributedCache cache = new DistributedCache(); + runner.addControllerService("cache", cache); + runner.enableControllerService(cache); + runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); + + final long initialTimestamp = System.currentTimeMillis(); + + proc.addEntity("name", "id", initialTimestamp); + runner.run(); + + final Map expectedState = new HashMap<>(); + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); + expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0"); + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + runner.run(); + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); + expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + } + + @Test + public void testResumeListingAfterClearingState() throws Exception { + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + + final long initialEventTimestamp = System.currentTimeMillis(); + proc.addEntity("name", "id", initialEventTimestamp); + proc.addEntity("name", "id2", initialEventTimestamp); + + // Add entities but these should not be transferred as they are the latest values + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + + // after providing a pause in listings, the files should now transfer + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Verify entities are not transferred again for the given state + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Clear state for this processor, eradicating timestamp + runner.getStateManager().clear(Scope.CLUSTER); + Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size()); + + // Ensure the original files are now transferred again. + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + } + + @Test + public void testOnlyNewStateStored() throws Exception { + + runner.run(); + + final long initialTimestamp = System.currentTimeMillis(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(2, stateMap.getVersion()); + + final Map map = stateMap.toMap(); + // Ensure timestamp and identifiers are migrated + assertEquals(4, map.size()); + assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); + assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); + assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); + assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1")); + + proc.addEntity("new name", "new id", initialTimestamp + 1); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + + StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(3, updatedStateMap.getVersion()); + + assertEquals(3, updatedStateMap.toMap().size()); + assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); + // Processed timestamp is now caught up + assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); + assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); + } + +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index 4a376e235cab..f5eae46f9715 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -19,7 +19,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateMap; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -33,7 +32,6 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -54,10 +52,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION; import static org.junit.Assert.assertEquals; public class TestAbstractListProcessor { @@ -101,257 +95,6 @@ public void setup() { @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); - /** - *

    Ensures that files are listed when those are old enough: - *

  • Files with last modified timestamp those are old enough to determine that those are completely written - * and no further files are expected to be added with the same timestamp.
  • - *
  • This behavior is expected when a processor is scheduled less frequently, such as hourly or daily.
  • - *

    - */ - @Test - public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception { - final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS); - - // These entries have existed before the processor runs at the first time. - proc.addEntity("name", "id", oldTimestamp); - proc.addEntity("name", "id2", oldTimestamp); - - // First run, the above listed entries should be emitted since it has existed. - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - // Run again without introducing any new entries - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - } - - private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException { - runner.run(); - - final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); - - setTargetSystemTimestampPrecision(targetPrecision); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - - // First run, the above listed entries would be skipped to avoid write synchronization issues - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(getSleepMillis(targetPrecision)); - - // Run again without introducing any new entries - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - } - - /** - *

    Ensures that newly created files should wait to confirm there is no more files created with the same timestamp: - *

  • If files have the latest modified timestamp at an iteration, then those should be postponed to be listed
  • - *
  • If those files still are the latest files at the next iteration, then those should be listed
  • - *

    - */ - @Test - public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception { - testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS); - } - - /** - * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target - * filesystem only provide timestamp precision in Seconds. - */ - @Test - public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception { - testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS); - } - - /** - * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target - * filesystem only provide timestamp precision in Minutes. - * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually. - */ - @Ignore - @Test - public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception { - testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES); - } - - private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException { - - final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); - - setTargetSystemTimestampPrecision(targetPrecision); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - - // First run, the above listed entries would be skipped to avoid write synchronization issues - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(getSleepMillis(targetPrecision)); - - // Running again, our two previously seen files are now cleared to be released - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - // Verify no new old files show up - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // An entry that is older than already processed entry should not be listed. - proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1)); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // If an entry whose timestamp is the same with the last processed timestamp should not be listed. - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Now a new file beyond the current time enters - proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1)); - - // It should show up - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - runner.clearTransferState(); - } - - private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) { - switch (targetPrecision) { - case MINUTES: - runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES); - break; - case SECONDS: - runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS); - break; - case MILLISECONDS: - runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS); - break; - } - } - - @Test - public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception { - testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS); - } - - @Test - public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception { - testOnlyNewEntriesEmitted(TimeUnit.SECONDS); - } - - /** - * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually. - */ - @Ignore - @Test - public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception { - testOnlyNewEntriesEmitted(TimeUnit.MINUTES); - } - - @Test - public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception { - - final long initialTimestamp = System.currentTimeMillis(); - - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - - // Emulate having state but not having had the processor run such as in a restart - final Map preexistingState = new HashMap<>(); - preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); - preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); - preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); - preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2"); - runner.getStateManager().setState(preexistingState, Scope.CLUSTER); - - // run for the first time - runner.run(); - - // First run, the above listed entries would be skipped - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - // Running again, these files should be eligible for transfer and again skipped - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Verify no new old files show up - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - proc.addEntity("name", "id3", initialTimestamp - 1); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Now a new file beyond the current time enters - proc.addEntity("name", "id2", initialTimestamp + 1); - - // It should now show up - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - runner.clearTransferState(); - } - - @Test - public void testStateStoredInClusterStateManagement() throws Exception { - - final DistributedCache cache = new DistributedCache(); - runner.addControllerService("cache", cache); - runner.enableControllerService(cache); - runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); - - final long initialTimestamp = System.currentTimeMillis(); - - proc.addEntity("name", "id", initialTimestamp); - runner.run(); - - final Map expectedState = new HashMap<>(); - // Ensure only timestamp is migrated - expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); - expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0"); - runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - // Ensure only timestamp is migrated - expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); - expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); - expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); - runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); - } - @Test public void testStateMigratedFromCacheService() throws InitializationException { @@ -415,41 +158,6 @@ public void testStateMigratedFromLocalFile() throws Exception { runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); } - @Test - public void testResumeListingAfterClearingState() throws Exception { - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - - final long initialEventTimestamp = System.currentTimeMillis(); - proc.addEntity("name", "id", initialEventTimestamp); - proc.addEntity("name", "id2", initialEventTimestamp); - - // Add entities but these should not be transferred as they are the latest values - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - - // after providing a pause in listings, the files should now transfer - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - // Verify entities are not transferred again for the given state - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Clear state for this processor, eradicating timestamp - runner.getStateManager().clear(Scope.CLUSTER); - Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size()); - - // Ensure the original files are now transferred again. - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - } - @Test public void testFetchOnStart() throws InitializationException { @@ -463,55 +171,7 @@ public void testFetchOnStart() throws InitializationException { assertEquals(1, cache.fetchCount); } - @Test - public void testOnlyNewStateStored() throws Exception { - - runner.run(); - - final long initialTimestamp = System.currentTimeMillis(); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); - assertEquals(2, stateMap.getVersion()); - - final Map map = stateMap.toMap(); - // Ensure timestamp and identifiers are migrated - assertEquals(4, map.size()); - assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); - assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); - assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); - assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1")); - - proc.addEntity("new name", "new id", initialTimestamp + 1); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - runner.clearTransferState(); - - StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); - assertEquals(3, updatedStateMap.getVersion()); - - assertEquals(3, updatedStateMap.toMap().size()); - assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); - // Processed timestamp is now caught up - assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); - assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); - } - - private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { + static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { private final Map stored = new HashMap<>(); private int fetchCount = 0; @@ -569,13 +229,12 @@ public long removeByPattern(String regex) throws IOException { } } + static class ConcreteListProcessor extends AbstractListProcessor { + final List entities = new ArrayList<>(); - private static class ConcreteListProcessor extends AbstractListProcessor { - private final List entities = new ArrayList<>(); - - public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; - public String persistenceFolder = "target/"; - public File persistenceFile = new File(persistenceFolder + persistenceFilename); + final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; + String persistenceFolder = "target/"; + File persistenceFile = new File(persistenceFolder + persistenceFilename); @Override public File getPersistenceFile() { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index bd93e3bbb310..3496ea0ca30c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -18,17 +18,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -99,97 +92,6 @@ public void validatePropertiesValidation() throws Exception { } } - @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetAllMessagesPattern() throws Exception { - String groupName = "validateGetAllMessagesPattern"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)"); - runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - @Test public void testJaasConfiguration() throws Exception { ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 000000000000..b0c6ebaf9f18 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class ITConsumeKafka { + + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index e46087949c9c..b1edd1f7181c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -18,17 +18,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -99,97 +92,6 @@ public void validatePropertiesValidation() throws Exception { } } - @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetAllMessagesPattern() throws Exception { - String groupName = "validateGetAllMessagesPattern"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_11.TOPICS, "(fo.*)|(ba)"); - runner.setProperty(ConsumeKafka_0_11.TOPIC_TYPE, "pattern"); - runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - @Test public void testJaasConfiguration() throws Exception { ConsumeKafka_0_11 consumeKafka = new ConsumeKafka_0_11(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 000000000000..9a3b3d9db2c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class ITConsumeKafka { + + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_11.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_0_11.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index c4b0140b7b83..9062e1edb343 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -20,24 +20,13 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assume.assumeFalse; import org.junit.Before; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; public class ConsumeKafkaTest { @@ -105,103 +94,4 @@ public void validatePropertiesValidation() throws Exception { assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); } } - - @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka proc = new ConsumeKafka() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka proc = new ConsumeKafka() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - private boolean isWindowsEnvironment() { - return System.getProperty("os.name").toLowerCase().startsWith("windows"); - } - - @Test - public void validateConsumerRetainer() throws Exception { - assumeFalse(isWindowsEnvironment());//skip if on windows - final ConsumerPool consumerPool = mock(ConsumerPool.class); - - final ConsumeKafka processor = new ConsumeKafka() { - @Override - protected ConsumerPool createConsumerPool(ProcessContext context, ComponentLog log) { - return consumerPool; - } - }; - - final ComponentLog logger = mock(ComponentLog.class); - final ProcessorInitializationContext initializationContext = mock(ProcessorInitializationContext.class); - when(initializationContext.getLogger()).thenReturn(logger); - processor.initialize(initializationContext); - - final ProcessContext processContext = mock(ProcessContext.class); - final PropertyValue heartbeatInternalMsConfig = mock(PropertyValue.class); - when(heartbeatInternalMsConfig.isSet()).thenReturn(true); - when(heartbeatInternalMsConfig.asInteger()).thenReturn(100); - when(processContext.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).thenReturn(heartbeatInternalMsConfig); - processor.onScheduled(processContext); - - // retainConsumers should be called at least 1 time if it passed longer than heartbeat interval milliseconds. - Thread.sleep(200); - verify(consumerPool, atLeast(1)).retainConsumers(); - - processor.stopConnectionRetainer(); - - // After stopping connection retainer, it shouldn't interact with consumerPool. - Thread.sleep(200); - verifyNoMoreInteractions(consumerPool); - } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 000000000000..084280a9200e --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import static org.junit.Assume.assumeFalse; +import org.junit.Before; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class ITConsumeKafka { + + Consumer mockConsumer = null; + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockConsumer = mock(Consumer.class); + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka proc = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka proc = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + private boolean isWindowsEnvironment() { + return System.getProperty("os.name").toLowerCase().startsWith("windows"); + } + + @Test + public void validateConsumerRetainer() throws Exception { + assumeFalse(isWindowsEnvironment());//skip if on windows + final ConsumerPool consumerPool = mock(ConsumerPool.class); + + final ConsumeKafka processor = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(ProcessContext context, ComponentLog log) { + return consumerPool; + } + }; + + final ComponentLog logger = mock(ComponentLog.class); + final ProcessorInitializationContext initializationContext = mock(ProcessorInitializationContext.class); + when(initializationContext.getLogger()).thenReturn(logger); + processor.initialize(initializationContext); + + final ProcessContext processContext = mock(ProcessContext.class); + final PropertyValue heartbeatInternalMsConfig = mock(PropertyValue.class); + when(heartbeatInternalMsConfig.isSet()).thenReturn(true); + when(heartbeatInternalMsConfig.asInteger()).thenReturn(100); + when(processContext.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).thenReturn(heartbeatInternalMsConfig); + processor.onScheduled(processContext); + + // retainConsumers should be called at least 1 time if it passed longer than heartbeat interval milliseconds. + Thread.sleep(200); + verify(consumerPool, atLeast(1)).retainConsumers(); + + processor.stopConnectionRetainer(); + + // After stopping connection retainer, it shouldn't interact with consumerPool. + Thread.sleep(200); + verifyNoMoreInteractions(consumerPool); + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 7b5a8fc5d9d7..10ac3984f426 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -18,17 +18,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -99,97 +92,6 @@ public void validatePropertiesValidation() throws Exception { } } - @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetAllMessagesPattern() throws Exception { - String groupName = "validateGetAllMessagesPattern"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)"); - runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern"); - runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - @Test public void testJaasConfiguration() throws Exception { ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 000000000000..69956e793879 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class ITConsumeKafka { + + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + +} diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java similarity index 99% rename from nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java rename to nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java index ee5a040248e4..7d5418050961 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java @@ -48,7 +48,7 @@ -public class TestLumberjackSocketChannelHandler { +public class ITLumberjackSocketChannelHandler { private EventFactory eventFactory; private ChannelHandlerFactory channelHandlerFactory; private BlockingQueue byteBuffers; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy similarity index 96% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy index 1c6b4f8e86f5..1f442253c3b5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy @@ -78,7 +78,7 @@ class ListenSyslogGroovyTest extends GroovyTestCase { Assert.assertTrue(port > 0) // write some TCP messages to the port in the background - final Thread sender = new Thread(new TestListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE)) + final Thread sender = new Thread(new ITListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE)) sender.setDaemon(true) sender.start() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java new file mode 100644 index 000000000000..a94bb20ab6fd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.List; +import org.apache.nifi.processors.standard.TestListenSyslog.DatagramSender; + +public class ITListenSyslog { + + static final Logger LOGGER = LoggerFactory.getLogger(ITListenSyslog.class); + + static final String PRI = "34"; + static final String SEV = "2"; + static final String FAC = "4"; + static final String TIME = "Oct 13 15:43:23"; + static final String HOST = "localhost.home"; + static final String BODY = "some message"; + + static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY; + static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; + static final String INVALID_MESSAGE = "this is not valid\n"; + + @Test + public void testUDP() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some UDP messages to the port in the background + final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all datagrams, or 30 seconds passed + try { + int numTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue()); + + final List events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPSingleConnection() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + // Allow time for the processor to perform its scheduled start + Thread.sleep(500); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 3; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // send 3 messages as 1 + final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n"; + final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPMultipleConnection() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5"); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testInvalid() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 10; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(50); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size(); + } + + // all messages should be transferred to invalid + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) { + flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", "")); + Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key())); + Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key())); + Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key())); + Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key())); + Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key())); + Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key())); + Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key())); + Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key())); + Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key())); + Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key()))); + } + + /** + * Sends a given number of datagrams to the given port. + */ + public static final class SingleConnectionSocketSender implements Runnable { + + final int port; + final int numMessages; + final long delay; + final String message; + + public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) { + this.port = port; + this.numMessages = numMessages; + this.delay = delay; + this.message = message; + } + + @Override + public void run() { + byte[] bytes = message.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", port)); + + for (int i = 0; i < numMessages; i++) { + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(delay); + } + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + } + + /** + * Sends a given number of datagrams to the given port. + */ + public static final class MultiConnectionSocketSender implements Runnable { + + final int port; + final int numMessages; + final long delay; + final String message; + + public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) { + this.port = port; + this.numMessages = numMessages; + this.delay = delay; + this.message = message; + } + + @Override + public void run() { + byte[] bytes = message.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + for (int i = 0; i < numMessages; i++) { + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", port)); + + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(delay); + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index f96ff228c61a..2c199c1b4c4a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -41,7 +41,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -62,215 +61,10 @@ public class TestListenSyslog { static final String HOST = "localhost.home"; static final String BODY = "some message"; - static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY ; + static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY; static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; static final String INVALID_MESSAGE = "this is not valid\n"; - @Test - public void testUDP() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 20; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some UDP messages to the port in the background - final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all datagrams, or 30 seconds passed - try { - int numTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (numTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue()); - - final List events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test - public void testTCPSingleConnection() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - // Allow time for the processor to perform its scheduled start - Thread.sleep(500); - - final int numMessages = 20; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some TCP messages to the port in the background - final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); - - final List events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test - public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 3; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // send 3 messages as 1 - final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n"; - final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); - - final List events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test - public void testTCPMultipleConnection() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5"); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 20; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some TCP messages to the port in the background - final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); - - final List events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - @Test public void testBatching() throws IOException, InterruptedException { final ListenSyslog proc = new ListenSyslog(); @@ -324,47 +118,6 @@ public void testBatching() throws IOException, InterruptedException { } } - @Test - public void testInvalid() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 10; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some TCP messages to the port in the background - final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(50); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size(); - } - - // all messages should be transferred to invalid - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - @Test public void testParsingError() throws IOException { final FailParseProcessor proc = new FailParseProcessor(); @@ -431,21 +184,6 @@ public byte[] getData() { runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE); } - - private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) { - flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", "")); - Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key())); - Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key())); - Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key())); - Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key())); - Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key())); - Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key())); - Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key())); - Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key())); - Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key())); - Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key()))); - } - /** * Sends a given number of datagrams to the given port. */ @@ -470,51 +208,7 @@ public void run() { try (DatagramChannel channel = DatagramChannel.open()) { channel.connect(new InetSocketAddress("localhost", port)); - for (int i=0; i < numMessages; i++) { - buffer.clear(); - buffer.put(bytes); - buffer.flip(); - - while(buffer.hasRemaining()) { - channel.write(buffer); - } - - Thread.sleep(delay); - } - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - } - - /** - * Sends a given number of datagrams to the given port. - */ - public static final class SingleConnectionSocketSender implements Runnable { - - final int port; - final int numMessages; - final long delay; - final String message; - - public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) { - this.port = port; - this.numMessages = numMessages; - this.delay = delay; - this.message = message; - } - - @Override - public void run() { - byte[] bytes = message.getBytes(Charset.forName("UTF-8")); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - - try (SocketChannel channel = SocketChannel.open()) { - channel.connect(new InetSocketAddress("localhost", port)); - - for (int i=0; i < numMessages; i++) { + for (int i = 0; i < numMessages; i++) { buffer.clear(); buffer.put(bytes); buffer.flip(); @@ -522,6 +216,7 @@ public void run() { while (buffer.hasRemaining()) { channel.write(buffer); } + Thread.sleep(delay); } } catch (IOException e) { @@ -532,51 +227,9 @@ public void run() { } } - /** - * Sends a given number of datagrams to the given port. - */ - public static final class MultiConnectionSocketSender implements Runnable { - - final int port; - final int numMessages; - final long delay; - final String message; - - public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) { - this.port = port; - this.numMessages = numMessages; - this.delay = delay; - this.message = message; - } - - @Override - public void run() { - byte[] bytes = message.getBytes(Charset.forName("UTF-8")); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - - for (int i=0; i < numMessages; i++) { - try (SocketChannel channel = SocketChannel.open()) { - channel.connect(new InetSocketAddress("localhost", port)); - - buffer.clear(); - buffer.put(bytes); - buffer.flip(); - - while (buffer.hasRemaining()) { - channel.write(buffer); - } - Thread.sleep(delay); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - } - } - // A mock version of ListenSyslog that will queue the provided events private static class FailParseProcessor extends ListenSyslog { + @Override protected SyslogParser getParser() { return new SyslogParser(StandardCharsets.UTF_8) { @@ -589,6 +242,7 @@ public SyslogEvent parseEvent(byte[] bytes, String sender) { } private static class CannedMessageProcessor extends ListenSyslog { + private final Iterator eventItr; public CannedMessageProcessor(final List events) { diff --git a/pom.xml b/pom.xml index a5f408cef1ca..146a3a213943 100644 --- a/pom.xml +++ b/pom.xml @@ -1985,7 +1985,9 @@ + resources or credentials that cannot be explicitly provided. Also appropriate + for tests which depend on inter-thread and/or network or having timing + considerations which could make the tests brittle on various environments.--> integration-tests