From 3a21cdac03f6b18145aeff25bd3d870f7de3088a Mon Sep 17 00:00:00 2001 From: Mateusz Dymczyk Date: Wed, 28 Sep 2016 14:25:45 +0900 Subject: [PATCH] [HEXDEV-596] Encryption of H2O communication channels (#71) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HEXDEV-596] Encryption of H2O communication channels A simple SSL/TLS based authentication/encryption mechanism for communications between H2O nodes is now supported. H2O Security =================== SSL internode security ------------------------------ By default communication between H2O nodes is not encrypted for performance reasons. H2O currently support SSL/TLS authentication (basic handshake authentication) and data encryption for internode communication. Usage ============ 1) Hadoop. The easiest way to enable SSL while running H2O via h2odriver is to pass "ssl_config" flag with no arguments. This will tell h2odriver to automatically generate all the necessary files and distribute them to all mappers. This distribution might be secure depending on your YARN configuration. hadoop jar h2odriver.jar -nodes 4 -mapperXmx 6g -output hdfsOutputDirName -ssl_config The user can also manually generate keystore/truststore and properties file as described in subsection 3) standalone and run the following command to use them instead: hadoop jar h2odriver.jar -nodes 4 -mapperXmx 6g -output hdfsOutputDirName -ssl_config ssl.properties In such case all the files (certificates and properties) have to be distributed to all the mapper nodes by the user. 2) Spark. Please check the Sparkling Water documentation for instructions on how to enable SSL while running on Spark. 3) Standalone/AWS. In this case the user has to generate the keystores, truststores and properties file manually. a) Generate public/private keys and distributed them (see "Keystore/truststore generation" section for more information). b) Generate the "ssl.properties" file (for full list of parameters check the "Configuraion" section): h2o_ssl_jks_internal=keystore.jks h2o_ssl_jks_password=password h2o_ssl_jts=truststore.jks h2o_ssl_jts_password=password c) To start an ssl enabled node pass the location to the properties file using "ssl_config" : java -jar h2o.jar -ssl_config ssl.properties Configuration: ------------------------------ To enable this feature one should set the -ssl_config parameter while starting an H2O node pointing to a configuration file (key=value format) containing the following values: - h2o_ssl_jks_internal (optional) - a path (absolute or relative) to a key-store file used for internal SSL communication - h2o_ssl_jks_password (optional) - the password to the internal key-store - h2o_ssl_jts (optional) - a path (absolute or relative) to a trust-store file used for internal SSL communication - h2o_ssl_jts_password (optional) - the password to the internal trust-store - h2o_ssl_protocol (optional) - protocol name used during encrypted communication (supported by JVM). Defaults to TSLv1.2. - h2o_ssl_enabled_algorithms (optional) - comma separated list of enabled cipher algorithms (ones supported by JVM) Should the first four (jks_internal, jks_password, jts, jts_password) parameters were missing, Java defaults will be used for all SSL related parameters. This is highly dependant on the Java version in use and in some cases might not work, therefore it is advised to set them. This has to be set for every node in the cluster. Every node needs to have access to both Java keystore and Java truststore containing appropriate keys and certificates. This feature should not be used together with “useUDP” flag as at this point in time we do not support UDP encryption through DTLS or any other protocol which might result in unencrypted data transfers. Keystore/truststore generation ------------------------------ Keystore/truststore creation and distribution are deployment specific and have to be handled by the end user. Basic keystore/truststore generation can be done using the keytool program, which ships with Java, documentation can be found here https://docs.oracle.com/javase/7/docs/technotes/tools/solaris/keytool.html. Each node should have a key pair generated, all public keys should be imported into a single truststore, which should be distributed to all the nodes. The simplest (and not recommended) way would be to call: keytool -genkeypair -keystore h2o-internal.jks -alias h2o-internal Distributed the h2o-internal.jks file to all the nodes and set it as both the keystore and truststore in ssl.config A more secure way would be to: 1) run the same command on each node: keytool -genkeypair -keystore h2o-internal.jks -alias h2o-internal 2) extract the certificate on each node: keytool -export -keystore h2o-internal.jks -alias signFiles -file node#.cer 3) distribute all of the above certificates to each node and on each node create a truststore containing all of them (or put all certificates on one node, import to truststore and distribute that truststore to each node): keytool -importcert -file node#.cer -keystore truststore.jks -alias node# Performance =================== Turning on SSL might result in performance overhead (from 10 to 100%) for settings and algorithms that exchange data between nodes. Example benchmark on a 5 node cluster (6GB memory per node) working with a 5.8mln row dataset (580MB): Non SSL SSL Parsing: 4.908s 5.304s GLM modelling: 01:39.446 01:49.634 DL modelling: 11:53.54 28:06.738 Caveats and missing pieces =================== - this feature CANNOT be used together with the “useUDP” flag. We currently do not support DTLS or any other encryption for UDP - should you start a mixed cloud of SSL and nonSSL nodes the SSL ones will fail to bootstrap while the nonSSL ones will become unresponsive - we do not provide in-memory data encryption. This might spill data to disk in unencrypted form should swaps to disk occur. As a workaround an encrypted drive is advised. - we do not support encryption of data saved to disk, should appropriate flags be enabled. Similarly to the previous caveat the user can use an encrypted drive to work around this issue. - currently we support only SSL, no SASL * Code review fixes * Log tshark output for debug * Tshark outdir typo fix * More code review fixes * Makes keystore work on IBM JDK * Fix test ssl gradle property check * Change ssl arg names as per code review result * Test ssl uppercased * Do not kill the node on bad TCP payload, ignore it after logging. * Remove dead method * Don't kill the node on lack of sentinel in intial TCP - simply close the connection * Fix SSL encryption test compilation error --- gradle.properties | 3 + gradle/multiNodeTesting.gradle | 5 + h2o-algos/build.gradle | 15 +- .../java/water/network/SSLEncryptionTest.java | 84 +++++ h2o-algos/src/test/resources/ssl.properties | 5 + h2o-algos/src/test/resources/ssl2.properties | 5 + h2o-algos/src/test/resources/ssl3.properties | 5 + h2o-algos/testMultiNode.sh | 41 +- h2o-algos/testSSL.sh | 137 +++++++ h2o-bindings/bin/gen_all.py | 3 +- h2o-core/src/main/java/water/AutoBuffer.java | 19 +- h2o-core/src/main/java/water/H2O.java | 10 + h2o-core/src/main/java/water/H2ONode.java | 47 ++- .../main/java/water/H2OSecurityManager.java | 69 ++++ .../main/java/water/TCPReceiverThread.java | 46 ++- .../water/network/SSLContextException.java | 7 + .../java/water/network/SSLProperties.java | 22 ++ .../java/water/network/SSLSocketChannel.java | 352 ++++++++++++++++++ .../network/SSLSocketChannelFactory.java | 114 ++++++ .../java/water/network/SecurityUtils.java | 124 ++++++ .../water/network/SocketChannelFactory.java | 38 ++ .../water/network/SocketChannelUtils.java | 23 ++ h2o-core/src/test/java/water/TestUtil.java | 2 +- .../network/SSLSocketChannelFactoryTest.java | 220 +++++++++++ .../java/water/network/SecurityUtilsTest.java | 41 ++ h2o-core/src/test/resources/cacerts.jks | Bin 0 -> 956 bytes h2o-core/src/test/resources/keystore.jks | Bin 0 -> 2246 bytes h2o-core/testMultiNode.sh | 8 +- .../src/main/java/water/hadoop/h2odriver.java | 39 +- .../src/main/java/water/hadoop/h2omapper.java | 57 ++- h2o-parsers/h2o-avro-parser/testMultiNode.sh | 8 +- h2o-scala/testMultiNode.sh | 9 +- multiNodeUtils.sh | 20 + scripts/run.py | 32 +- scripts/run_hadoop_job.py | 11 +- 35 files changed, 1522 insertions(+), 99 deletions(-) create mode 100644 h2o-algos/src/test/java/water/network/SSLEncryptionTest.java create mode 100644 h2o-algos/src/test/resources/ssl.properties create mode 100644 h2o-algos/src/test/resources/ssl2.properties create mode 100644 h2o-algos/src/test/resources/ssl3.properties create mode 100755 h2o-algos/testSSL.sh create mode 100644 h2o-core/src/main/java/water/H2OSecurityManager.java create mode 100644 h2o-core/src/main/java/water/network/SSLContextException.java create mode 100644 h2o-core/src/main/java/water/network/SSLProperties.java create mode 100644 h2o-core/src/main/java/water/network/SSLSocketChannel.java create mode 100644 h2o-core/src/main/java/water/network/SSLSocketChannelFactory.java create mode 100644 h2o-core/src/main/java/water/network/SecurityUtils.java create mode 100644 h2o-core/src/main/java/water/network/SocketChannelFactory.java create mode 100644 h2o-core/src/main/java/water/network/SocketChannelUtils.java create mode 100644 h2o-core/src/test/java/water/network/SSLSocketChannelFactoryTest.java create mode 100644 h2o-core/src/test/java/water/network/SecurityUtilsTest.java create mode 100644 h2o-core/src/test/resources/cacerts.jks create mode 100644 h2o-core/src/test/resources/keystore.jks create mode 100644 multiNodeUtils.sh diff --git a/gradle.properties b/gradle.properties index 8d3ffc4639ec..d9c8c330e1c8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -29,3 +29,6 @@ org.gradle.jvmargs='-XX:MaxPermSize=384m' # Used for h2o-bindings generation, to allow us to use an extended h2o.jar h2oJarfile='build/h2o.jar' + +# Run ssl tests +doTestSSL=false diff --git a/gradle/multiNodeTesting.gradle b/gradle/multiNodeTesting.gradle index d233f40c51a5..b83656cffd17 100644 --- a/gradle/multiNodeTesting.gradle +++ b/gradle/multiNodeTesting.gradle @@ -22,6 +22,11 @@ task testMultiNode(type: Exec) { environment "BUILD_DIR", project.buildDir def args = ['bash', './testMultiNode.sh'] + + if(project.hasProperty('doTestSSL')) { + args << 'ssl' + } + if (project.hasProperty("jacocoCoverage")) { args << 'jacoco' } diff --git a/h2o-algos/build.gradle b/h2o-algos/build.gradle index d140f4a7b204..2daa0e2a9271 100644 --- a/h2o-algos/build.gradle +++ b/h2o-algos/build.gradle @@ -11,6 +11,15 @@ dependencies { apply from: "${rootDir}/gradle/dataCheck.gradle" +task testSSLEncryption(type: Exec) { + dependsOn cpLibs, jar, testJar + if(project.hasProperty('doTestSSL')) { + commandLine 'bash', './testSSL.sh' + } else { + commandLine 'echo', 'SSL tests not enabled' + } +} + // The default 'test' behavior is broken in that it does not grok clusters. // For H2O, all tests need to be run on a cluster, where each JVM is // "free-running" - it's stdout/stderr are NOT hooked by another process. If @@ -22,10 +31,8 @@ apply from: "${rootDir}/gradle/dataCheck.gradle" // level) to files - then scrape the files later for test results. test { dependsOn ":h2o-core:testJar" - dependsOn smalldataCheck, cpLibs, jar, testJar, testSingleNode, testMultiNode + dependsOn smalldataCheck, cpLibs, jar, testJar, testSingleNode, testMultiNode, testSSLEncryption // Defeat task 'test' by running no tests. exclude '**' -} - -testMultiNode.shouldRunAfter testSingleNode +} \ No newline at end of file diff --git a/h2o-algos/src/test/java/water/network/SSLEncryptionTest.java b/h2o-algos/src/test/java/water/network/SSLEncryptionTest.java new file mode 100644 index 000000000000..9c95eb23ea9e --- /dev/null +++ b/h2o-algos/src/test/java/water/network/SSLEncryptionTest.java @@ -0,0 +1,84 @@ +package water.network; + +import hex.tree.gbm.GBM; +import hex.tree.gbm.GBMModel; +import org.junit.Assert; +import org.junit.Ignore; +import water.TestUtil; +import water.fvec.Frame; +import water.util.Log; + +import java.util.Date; + +import static hex.genmodel.utils.DistributionFamily.gaussian; + +/** + * This class is used to capture TCP packets while training a model + * The result is then used to check if SSL encryption is working properly + */ +@Ignore +public class SSLEncryptionTest extends TestUtil { + + public static void main(String[] args) { + if (args.length == 1) { + testGBMRegressionGaussianSSL(args[0]); + } else { + testGBMRegressionGaussianNonSSL(); + } + + System.exit(0); + } + + public static void testGBMRegressionGaussianNonSSL() { + stall_till_cloudsize(4); + testGBMRegressionGaussian(); + } + + public static void testGBMRegressionGaussianSSL(String prop) { + stall_till_cloudsize(new String[] {"-internal_security_conf", prop}, 4); + testGBMRegressionGaussian(); + } + + private static void testGBMRegressionGaussian() { + GBMModel gbm = null; + Frame fr = null, fr2 = null; + try { + Date start = new Date(); + + fr = parse_test_file("./smalldata/gbm_test/Mfgdata_gaussian_GBM_testing.csv"); + GBMModel.GBMParameters parms = new GBMModel.GBMParameters(); + parms._train = fr._key; + parms._distribution = gaussian; + parms._response_column = fr._names[1]; // Row in col 0, dependent in col 1, predictor in col 2 + parms._ntrees = 1; + parms._max_depth = 1; + parms._min_rows = 1; + parms._nbins = 20; + // Drop ColV2 0 (row), keep 1 (response), keep col 2 (only predictor), drop remaining cols + String[] xcols = parms._ignored_columns = new String[fr.numCols()-2]; + xcols[0] = fr._names[0]; + System.arraycopy(fr._names,3,xcols,1,fr.numCols()-3); + parms._learn_rate = 1.0f; + parms._score_each_iteration=true; + + GBM job = new GBM(parms); + gbm = job.trainModel().get(); + + Log.info(">>> GBM parsing and training took: " + (new Date().getTime() - start.getTime()) + " ms."); + + Assert.assertTrue(job.isStopped()); //HEX-1817 + + // Done building model; produce a score column with predictions + + Date scoringStart = new Date(); + + fr2 = gbm.score(fr); + + Log.info(">>> GBM scoring took: " + (new Date().getTime() - scoringStart.getTime()) + " ms."); + } finally { + if( fr != null ) fr .remove(); + if( fr2 != null ) fr2.remove(); + if( gbm != null ) gbm.remove(); + } + } +} diff --git a/h2o-algos/src/test/resources/ssl.properties b/h2o-algos/src/test/resources/ssl.properties new file mode 100644 index 000000000000..577a8ca36518 --- /dev/null +++ b/h2o-algos/src/test/resources/ssl.properties @@ -0,0 +1,5 @@ +h2o_ssl_protocol=TLSv1.2 +h2o_ssl_jks_internal=../h2o-core/src/test/resources/keystore.jks +h2o_ssl_jks_password=password +h2o_ssl_jts=../h2o-core/src/test/resources/cacerts.jks +h2o_ssl_jts_password=password \ No newline at end of file diff --git a/h2o-algos/src/test/resources/ssl2.properties b/h2o-algos/src/test/resources/ssl2.properties new file mode 100644 index 000000000000..bfc241edf60d --- /dev/null +++ b/h2o-algos/src/test/resources/ssl2.properties @@ -0,0 +1,5 @@ +h2o_ssl_protocol=TLSv1.2 +h2o_ssl_jks_internal=../../h2o-core/src/test/resources/keystore.jks +h2o_ssl_jks_password=password +h2o_ssl_jts=../../h2o-core/src/test/resources/cacerts.jks +h2o_ssl_jts_password=password \ No newline at end of file diff --git a/h2o-algos/src/test/resources/ssl3.properties b/h2o-algos/src/test/resources/ssl3.properties new file mode 100644 index 000000000000..b2af8be8a4cd --- /dev/null +++ b/h2o-algos/src/test/resources/ssl3.properties @@ -0,0 +1,5 @@ +h2o_ssl_protocol=TLSv1.2 +h2o_ssl_jks_internal=../../../h2o-core/src/test/resources/keystore.jks +h2o_ssl_jks_password=password +h2o_ssl_jts=../../../h2o-core/src/test/resources/cacerts.jks +h2o_ssl_jts_password=password \ No newline at end of file diff --git a/h2o-algos/testMultiNode.sh b/h2o-algos/testMultiNode.sh index f5b49dfee9e3..945b17e727d9 100755 --- a/h2o-algos/testMultiNode.sh +++ b/h2o-algos/testMultiNode.sh @@ -1,4 +1,5 @@ #!/bin/bash +source ../multiNodeUtils.sh # Argument parsing if [ "$1" = "jacoco" ] @@ -109,21 +110,21 @@ CLUSTER_BASEPORT_2=45000 CLUSTER_BASEPORT_3=46000 CLUSTER_BASEPORT_4=47000 CLUSTER_BASEPORT_5=48000 -$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out 1> $OUTDIR/out.1.1 2>&1 & PID_11=$! -$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out 1> $OUTDIR/out.1.2 2>&1 & PID_12=$! -$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out 1> $OUTDIR/out.1.3 2>&1 & PID_13=$! -$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out 1> $OUTDIR/out.2.1 2>&1 & PID_21=$! -$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out 1> $OUTDIR/out.2.2 2>&1 & PID_22=$! -$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out 1> $OUTDIR/out.2.3 2>&1 & PID_23=$! -$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out 1> $OUTDIR/out.3.1 2>&1 & PID_31=$! -$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out 1> $OUTDIR/out.3.2 2>&1 & PID_32=$! -$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out 1> $OUTDIR/out.3.3 2>&1 & PID_33=$! -$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out 1> $OUTDIR/out.4.1 2>&1 & PID_41=$! -$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out 1> $OUTDIR/out.4.2 2>&1 & PID_42=$! -$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out 1> $OUTDIR/out.4.3 2>&1 & PID_43=$! -$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out 1> $OUTDIR/out.5.1 2>&1 & PID_51=$! -$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out 1> $OUTDIR/out.5.2 2>&1 & PID_52=$! -$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out 1> $OUTDIR/out.5.3 2>&1 & PID_53=$! +$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out $SSL 1> $OUTDIR/out.1.1 2>&1 & PID_11=$! +$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out $SSL 1> $OUTDIR/out.1.2 2>&1 & PID_12=$! +$JVM water.H2O -name $CLUSTER_NAME.1 -baseport $CLUSTER_BASEPORT_1 -ga_opt_out $SSL 1> $OUTDIR/out.1.3 2>&1 & PID_13=$! +$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out $SSL 1> $OUTDIR/out.2.1 2>&1 & PID_21=$! +$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out $SSL 1> $OUTDIR/out.2.2 2>&1 & PID_22=$! +$JVM water.H2O -name $CLUSTER_NAME.2 -baseport $CLUSTER_BASEPORT_2 -ga_opt_out $SSL 1> $OUTDIR/out.2.3 2>&1 & PID_23=$! +$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out $SSL 1> $OUTDIR/out.3.1 2>&1 & PID_31=$! +$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out $SSL 1> $OUTDIR/out.3.2 2>&1 & PID_32=$! +$JVM water.H2O -name $CLUSTER_NAME.3 -baseport $CLUSTER_BASEPORT_3 -ga_opt_out $SSL 1> $OUTDIR/out.3.3 2>&1 & PID_33=$! +$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out $SSL 1> $OUTDIR/out.4.1 2>&1 & PID_41=$! +$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out $SSL 1> $OUTDIR/out.4.2 2>&1 & PID_42=$! +$JVM water.H2O -name $CLUSTER_NAME.4 -baseport $CLUSTER_BASEPORT_4 -ga_opt_out $SSL 1> $OUTDIR/out.4.3 2>&1 & PID_43=$! +$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out $SSL 1> $OUTDIR/out.5.1 2>&1 & PID_51=$! +$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out $SSL 1> $OUTDIR/out.5.2 2>&1 & PID_52=$! +$JVM water.H2O -name $CLUSTER_NAME.5 -baseport $CLUSTER_BASEPORT_5 -ga_opt_out $SSL 1> $OUTDIR/out.5.3 2>&1 & PID_53=$! # If coverage is being run, then pass a system variable flag so that timeout limits are increased. if [ $JACOCO_ENABLED = true ] @@ -136,11 +137,11 @@ fi # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-algos junit tests... -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.1 -Dai.h2o.baseport=$CLUSTER_BASEPORT_1 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==0'` 2>&1 ; echo $? > $OUTDIR/status.1) 1> $OUTDIR/out.1 2>&1 & PID_1=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.2 -Dai.h2o.baseport=$CLUSTER_BASEPORT_2 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==1'` 2>&1 ; echo $? > $OUTDIR/status.2) 1> $OUTDIR/out.2 2>&1 & PID_2=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.3 -Dai.h2o.baseport=$CLUSTER_BASEPORT_3 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==2'` 2>&1 ; echo $? > $OUTDIR/status.3) 1> $OUTDIR/out.3 2>&1 & PID_3=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.4 -Dai.h2o.baseport=$CLUSTER_BASEPORT_4 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==3'` 2>&1 ; echo $? > $OUTDIR/status.4) 1> $OUTDIR/out.4 2>&1 & PID_4=$! -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.5 -Dai.h2o.baseport=$CLUSTER_BASEPORT_5 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==4'` 2>&1 ; echo $? > $OUTDIR/status.5) 1> $OUTDIR/out.5 2>&1 & PID_5=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.1 -Dai.h2o.baseport=$CLUSTER_BASEPORT_1 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==0'` 2>&1 ; echo $? > $OUTDIR/status.1) 1> $OUTDIR/out.1 2>&1 & PID_1=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.2 -Dai.h2o.baseport=$CLUSTER_BASEPORT_2 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==1'` 2>&1 ; echo $? > $OUTDIR/status.2) 1> $OUTDIR/out.2 2>&1 & PID_2=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.3 -Dai.h2o.baseport=$CLUSTER_BASEPORT_3 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==2'` 2>&1 ; echo $? > $OUTDIR/status.3) 1> $OUTDIR/out.3 2>&1 & PID_3=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.4 -Dai.h2o.baseport=$CLUSTER_BASEPORT_4 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==3'` 2>&1 ; echo $? > $OUTDIR/status.4) 1> $OUTDIR/out.4 2>&1 & PID_4=$! +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME.5 -Dai.h2o.baseport=$CLUSTER_BASEPORT_5 -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt | awk 'NR%5==4'` 2>&1 ; echo $? > $OUTDIR/status.5) 1> $OUTDIR/out.5 2>&1 & PID_5=$! wait ${PID_1} ${PID_2} ${PID_3} ${PID_4} ${PID_5} 1> /dev/null 2>&1 grep EXECUTION $OUTDIR/out.* | sed -e "s/.*TEST \(.*\) EXECUTION TIME: \(.*\) (Wall.*/\2 \1/" | sort -gr | head -n 10 >> $OUTDIR/out.0 diff --git a/h2o-algos/testSSL.sh b/h2o-algos/testSSL.sh new file mode 100755 index 000000000000..ca9f33a7eaa0 --- /dev/null +++ b/h2o-algos/testSSL.sh @@ -0,0 +1,137 @@ +#!/bin/bash + +# Clean out any old sandbox, make a new one +OUTDIR=sandbox +rm -fr $OUTDIR; mkdir -p $OUTDIR + +# Check for os +SEP=: +case "`uname`" in + CYGWIN* ) + SEP=";" + ;; +esac + +function cleanup () { + kill -9 ${PID_1} ${PID_2} ${PID_3} ${PID_4} 1> /dev/null 2>&1 + wait 1> /dev/null 2>&1 +} + +function countDataCells () { + # Number of tokens we didn't find + COUNT=0 + # Number of tokens we looked for + TOTAL=0 + FILE=../smalldata/gbm_test/Mfgdata_gaussian_GBM_testing.csv + while IFS= read -r line; do + IFS=',' read -r -a array <<< "$line" + for el in "${array[@]}"; do + # I don't check for "\d+" since things like "1" and "11" can appear as part of SSL encrypted gibberish + # and it's not trivial to distinguish it from actual data + if [[ ! $el =~ \"[0-9]+\" ]]; then + grep -q -- "$el" sandbox/test.out + COUNT=$((COUNT + $?)) + TOTAL=$((TOTAL+1)) + fi + done + # Because the column names are mostly one letter they might actually appear + # in the encrypted TCP gibberish so we'll skip them but check the actual data + done <<< "$(sed 1d $FILE)" + echo "Found $((TOTAL-COUNT)) tokens from a total of $TOTAL" 1>&2 + # Number of tokens we found + echo $((TOTAL-COUNT)) +} + +function testOutput () { + # Grab the nonSSL data field from the packet body in human readable format + tshark -x -r $OUTDIR/h2o-nonSSL.pcap -T text | awk -F " " '{print $3}' > $OUTDIR/test_tmp.out + # Remove all newlines and spaces for future grep + cat $OUTDIR/test_tmp.out | awk 1 RS='\n' ORS= | awk '{gsub(/ /,"")}1' > $OUTDIR/test.out + + # Check that all the data we used as input is in the TCP dump in not encrypted form! + FOUND=$(countDataCells) + if [[ $FOUND -eq 0 ]]; then + echo "Haven't found any of the original data in the nonSSL TCP dump." + echo h2o-algos junit tests FAILED + exit 1 + fi + + # Grab the SSL data field from the packet body in human readable format + tshark -x -r $OUTDIR/h2o-SSL.pcap -T text | awk -F " " '{print $3}' > $OUTDIR/test_tmp.out + cat $OUTDIR/test_tmp.out | awk 1 RS='\n' ORS= | awk '{gsub(/ /,"")}1' > $OUTDIR/test.out + + # Check that none of the data we used as input is in the TCP dump in notencrypted form! + FOUND=$(countDataCells) + if [[ $FOUND -ne 0 ]]; then + echo "Found original data in the SSL TCP dump." + echo h2o-algos junit tests FAILED + exit 1 + fi + + echo h2o-algos junit tests PASSED + exit 0 +} + +trap cleanup SIGTERM SIGINT + +# Find java command +if [ -z "$TEST_JAVA_HOME" ]; then + # Use default + JAVA_CMD="java" +else + # Use test java home + JAVA_CMD="$TEST_JAVA_HOME/bin/java" + # Increase XMX since JAVA_HOME can point to java6 + JAVA6_REGEXP=".*1\.6.*" + if [[ $TEST_JAVA_HOME =~ $JAVA6_REGEXP ]]; then + JAVA_CMD="${JAVA_CMD}" + fi +fi + +JVM="nice $JAVA_CMD -ea -Xmx3g -Xms3g -cp build/libs/h2o-algos-test.jar${SEP}build/libs/h2o-algos.jar${SEP}../h2o-core/build/libs/h2o-core-test.jar${SEP}../h2o-core/build/libs/h2o-core.jar${SEP}../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../lib/*" +echo "$JVM" > $OUTDIR/jvm_cmd.txt + +SSL="" +# Launch 3 helper JVMs. All output redir'd at the OS level to sandbox files. +CLUSTER_NAME=junit_cluster_$$ +CLUSTER_BASEPORT=44000 +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.1 2>&1 & PID_1=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.2 2>&1 & PID_2=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.3 2>&1 & PID_3=$! + +INTERFACE=${TSHARK_INTERFACE:-"eth0"} + +echo Running nonSSL test on interface ${INTERFACE}... + +pwd + +tshark -i ${INTERFACE} -T fields -e data -w ${OUTDIR}/h2o-nonSSL.pcap 1> /dev/null 2>&1 & PID_4=$! + +java -Dai.h2o.name=$CLUSTER_NAME -ea \ + -cp "build/libs/h2o-algos-test.jar${SEP}build/libs/h2o-algos.jar${SEP}../h2o-core/build/libs/h2o-core.jar${SEP}../h2o-core/build/libs/h2o-core-test.jar${SEP}../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../lib/*" \ + water.network.SSLEncryptionTest + +echo After test cleanup... + +cleanup + +SSL_CONFIG="src/test/resources/ssl.properties" +SSL="-internal_security_conf "$SSL_CONFIG +CLUSTER_NAME=$CLUSTER_NAME"_2" +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.1 2>&1 & PID_1=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.2 2>&1 & PID_2=$! +$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.3 2>&1 & PID_3=$! + +echo Running SSL test... + +tshark -i ${INTERFACE} -T fields -e data -w ${OUTDIR}/h2o-SSL.pcap 1> /dev/null 2>&1 & PID_4=$! + +java -Dai.h2o.name=$CLUSTER_NAME -ea \ + -cp "build/libs/h2o-algos-test.jar${SEP}build/libs/h2o-algos.jar${SEP}../h2o-core/build/libs/h2o-core.jar${SEP}../h2o-core/build/libs/h2o-core-test.jar${SEP}../h2o-genmodel/build/libs/h2o-genmodel.jar${SEP}../lib/*" \ + water.network.SSLEncryptionTest src/test/resources/ssl.properties + +echo After test cleanup... + +cleanup + +testOutput diff --git a/h2o-bindings/bin/gen_all.py b/h2o-bindings/bin/gen_all.py index 9df68fd39c32..ca31881ba8e7 100755 --- a/h2o-bindings/bin/gen_all.py +++ b/h2o-bindings/bin/gen_all.py @@ -31,7 +31,8 @@ base_port=48000, xmx="4g", cp="", - output_dir=results_dir + output_dir=results_dir, + test_ssl=False ) cloud.start() cloud.wait_for_cloud_to_be_up() diff --git a/h2o-core/src/main/java/water/AutoBuffer.java b/h2o-core/src/main/java/water/AutoBuffer.java index fac54a828fcd..bc34ebe760ba 100644 --- a/h2o-core/src/main/java/water/AutoBuffer.java +++ b/h2o-core/src/main/java/water/AutoBuffer.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Random; +import water.network.SocketChannelUtils; import water.util.Log; import water.util.TwoDimTable; @@ -130,7 +131,7 @@ public final class AutoBuffer { /** Incoming TCP request. Make a read-mode AutoBuffer from the open Channel, * figure the originating H2ONode from the first few bytes read. */ - AutoBuffer( SocketChannel sock ) throws IOException { + AutoBuffer( ByteChannel sock, InetAddress remoteAddress ) throws IOException { _chan = sock; raisePriority(); // Make TCP priority high _bb = BBP_BIG.make(); // Get a big / TPC-sized ByteBuffer @@ -138,7 +139,7 @@ public final class AutoBuffer { _read = true; // Reading by default _firstPage = true; // Read Inet from socket, port from the stream, figure out H2ONode - _h2o = H2ONode.intern(sock.socket().getInetAddress(), getPort()); + _h2o = H2ONode.intern(remoteAddress, getPort()); _firstPage = true; // Yes, must reset this. _time_start_ms = System.currentTimeMillis(); _persist = Value.TCP; @@ -425,14 +426,14 @@ public final int close() { assert x == 0xab : "AB.close instead of 0xab sentinel got "+x+", "+this; assert _chan != null; // chan set by incoming reader, since we KNOW it is a TCP // Write the reader-handshake-byte. - ((SocketChannel)_chan).socket().getOutputStream().write(0xcd); + SocketChannelUtils.underlyingSocketChannel(_chan).socket().getOutputStream().write(0xcd); // do not close actually reader socket; recycle it in TCPReader thread } else { // Writer? put1(0xab); // Write one-more byte ; might set _chan from null to not-null sendPartial(); // Finish partial writes; might set _chan from null to not-null assert _chan != null; // _chan is set not-null now! // Read the writer-handshake-byte. - int x = ((SocketChannel)_chan).socket().getInputStream().read(); + int x = SocketChannelUtils.underlyingSocketChannel(_chan).socket().getInputStream().read(); // either TCP con was dropped or other side closed connection without reading/confirming (e.g. task was cancelled). if( x == -1 ) throw new IOException("Other side closed connection before handshake byte read"); assert x == 0xcd : "Handshake; writer expected a 0xcd from reader but got "+x; @@ -442,7 +443,7 @@ public final int close() { _chan = null; // No channel now, since i/o error throw ioe; // Rethrow after close } finally { - if( !_read ) _h2o.freeTCPSocket((SocketChannel)_chan); // Recycle writable TCP channel + if( !_read ) _h2o.freeTCPSocket(_chan); // Recycle writable TCP channel restorePriority(); // And if we raised priority, lower it back } @@ -486,7 +487,7 @@ void drainClose() { if( chan != null ) { // Channel assumed sick from prior IOException try { chan.close(); } catch( IOException ignore ) {} // Silently close _chan = null; // No channel now! - if( !_read && chan instanceof SocketChannel) _h2o.freeTCPSocket((SocketChannel)chan); // Recycle writable TCP channel + if( !_read && SocketChannelUtils.isSocketChannel(chan)) _h2o.freeTCPSocket(chan); // Recycle writable TCP channel } restorePriority(); // And if we raised priority, lower it back bbFree(); @@ -496,7 +497,7 @@ void drainClose() { } // True if we opened a TCP channel, or will open one to close-and-send - boolean hasTCP() { assert !isClosed(); return _chan instanceof SocketChannel || (_h2o!=null && _bb.position() >= MTU); } + boolean hasTCP() { assert !isClosed(); return SocketChannelUtils.isSocketChannel(_chan) || (_h2o!=null && _bb.position() >= MTU); } // Size in bytes sent, after a close() int size() { return _size; } @@ -527,7 +528,7 @@ public final byte[] bufClose() { // over with. private void raisePriority() { if(_oldPrior == -1){ - assert _chan instanceof SocketChannel; + assert SocketChannelUtils.isSocketChannel(_chan); _oldPrior = Thread.currentThread().getPriority(); Thread.currentThread().setPriority(Thread.MAX_PRIORITY-1); } @@ -650,7 +651,7 @@ private ByteBuffer sendPartial() { long ns = System.nanoTime(); while( _bb.hasRemaining() ) { _chan.write(_bb); - if( RANDOM_TCP_DROP != null &&_chan instanceof SocketChannel && RANDOM_TCP_DROP.nextInt(100) == 0 ) + if( RANDOM_TCP_DROP != null && SocketChannelUtils.isSocketChannel(_chan) && RANDOM_TCP_DROP.nextInt(100) == 0 ) throw new IOException("Random TCP Write Fail"); } _time_io_ns += (System.nanoTime()-ns); diff --git a/h2o-core/src/main/java/water/H2O.java b/h2o-core/src/main/java/water/H2O.java index c7152566a2ab..986ad39c76f4 100644 --- a/h2o-core/src/main/java/water/H2O.java +++ b/h2o-core/src/main/java/water/H2O.java @@ -160,6 +160,9 @@ public static void printHelp() { " -login_conf \n" + " LoginService configuration file\n" + "\n" + + " -internal_security_conf \n" + + " Path (absolute or relative) to a file containing all internal security related configurations\n" + + "\n" + "Cloud formation behavior:\n" + "\n" + " New H2O nodes join together to form a cloud at startup time.\n" + @@ -298,6 +301,9 @@ public static void printHelp() { /** -login_conf is login configuration service file on local filesystem */ public String login_conf = null; + /** -internal_security_conf path (absolute or relative) to a file containing all internal security related configurations */ + public String internal_security_conf = null; + //----------------------------------------------------------------------------------- // Debugging //----------------------------------------------------------------------------------- @@ -521,6 +527,10 @@ else if (s.matches("login_conf")) { i = s.incrementAndCheck(i, args); ARGS.login_conf = args[i]; } + else if (s.matches("internal_security_conf")) { + i = s.incrementAndCheck(i, args); + ARGS.internal_security_conf = args[i]; + } else { parseFailed("Unknown argument (" + s + ")"); } diff --git a/h2o-core/src/main/java/water/H2ONode.java b/h2o-core/src/main/java/water/H2ONode.java index 2b45d6da5e3e..460771633e94 100644 --- a/h2o-core/src/main/java/water/H2ONode.java +++ b/h2o-core/src/main/java/water/H2ONode.java @@ -4,6 +4,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.ByteChannel; import java.nio.channels.DatagramChannel; import java.nio.channels.SocketChannel; import java.util.*; @@ -14,6 +15,7 @@ import water.nbhm.NonBlockingHashMap; import water.nbhm.NonBlockingHashMapLong; import water.util.ArrayUtils; +import water.network.SocketChannelFactory; import water.util.Log; import water.util.MathUtils; import water.util.UnsafeUtils; @@ -28,6 +30,9 @@ */ public final class H2ONode extends Iced implements Comparable { + transient private SocketChannelFactory _socketFactory; + transient private H2OSecurityManager _security; + transient short _unique_idx; // Dense integer index, skipping 0. NOT cloud-wide unique. transient boolean _announcedLostContact; // True if heartbeat published a no-contact msg transient public long _last_heard_from; // Time in msec since we last heard from this Node @@ -110,6 +115,9 @@ private H2ONode( H2Okey key, short unique_idx ) { _unique_idx = unique_idx; _last_heard_from = System.currentTimeMillis(); _heartbeat = new HeartBeat(); + + _security = new H2OSecurityManager(); + _socketFactory = new SocketChannelFactory(_security); } // --------------- @@ -226,19 +234,19 @@ public static H2ONode self(InetAddress local) { // A queue of available TCP sockets // re-usable TCP socket opened to this node, or null. // This is essentially a BlockingQueue/Stack that allows null. - private transient SocketChannel _socks[] = new SocketChannel[2]; + private transient ByteChannel _socks[] = new ByteChannel[2]; private transient int _socksAvail=_socks.length; // Count of concurrent TCP requests both incoming and outgoing static final AtomicInteger TCPS = new AtomicInteger(0); - SocketChannel getTCPSocket() throws IOException { + ByteChannel getTCPSocket() throws IOException { // Under lock, claim an existing open socket if possible synchronized(this) { // Limit myself to the number of open sockets from node-to-node while( _socksAvail == 0 ) try { wait(1000); } catch( InterruptedException ignored ) { } // Claim an open socket - SocketChannel sock = _socks[--_socksAvail]; + ByteChannel sock = _socks[--_socksAvail]; if( sock != null ) { if( sock.isOpen() ) return sock; // Return existing socket! // Else it's an already-closed socket, lower open TCP count @@ -257,12 +265,14 @@ SocketChannel getTCPSocket() throws IOException { bb.putChar((char)H2O.H2O_PORT); bb.put((byte)0xef); bb.flip(); - while(bb.hasRemaining()) - sock2.write(bb); + ByteChannel wrappedSocket = _socketFactory.clientChannel(sock2, _key.getHostName(), _key.getPort()); + while(bb.hasRemaining()) { + wrappedSocket.write(bb); + } TCPS.incrementAndGet(); // Cluster-wide counting - return sock2; + return wrappedSocket; } - synchronized void freeTCPSocket( SocketChannel sock ) { + synchronized void freeTCPSocket( ByteChannel sock ) { assert 0 <= _socksAvail && _socksAvail < _socks.length; assert TCPS.get() > 0; if( sock != null && !sock.isOpen() ) sock = null; @@ -285,7 +295,7 @@ synchronized void freeTCPSocket( SocketChannel sock ) { // Buffers the small messages together and sends the bytes over via TCP channel. class UDP_TCP_SendThread extends Thread { - private SocketChannel _chan; // Lazily made on demand; closed & reopened on error + private ByteChannel _chan; // Lazily made on demand; closed & reopened on error private final ByteBuffer _bb; // Reusable output large buffer public UDP_TCP_SendThread(){ @@ -349,7 +359,7 @@ void sendBuffer(){ _bb.flip(); // limit set to old position; position set to 0 while( _bb.hasRemaining() ) { try { - SocketChannel chan = _chan == null ? (_chan=openChan()) : _chan; + ByteChannel chan = _chan == null ? (_chan=openChan()) : _chan; chan.write(_bb); } catch(IOException ioe) { @@ -376,7 +386,7 @@ void sendBuffer(){ } // Open channel on first write attempt - private SocketChannel openChan() throws IOException { + private ByteChannel openChan() throws IOException { // Must make a fresh socket SocketChannel sock = SocketChannel.open(); sock.socket().setReuseAddress(true); @@ -389,9 +399,11 @@ private SocketChannel openChan() throws IOException { sock.socket().setTcpNoDelay(true); ByteBuffer bb = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder()); bb.put((byte) 1).putChar((char) H2O.H2O_PORT).put((byte) 0xef).flip(); - while (bb.hasRemaining()) // Write out magic startup sequence - sock.write(bb); - return sock; + ByteChannel wrappedSocket = _socketFactory.clientChannel(sock, isa.getHostName(), isa.getPort()); + while (bb.hasRemaining()) { // Write out magic startup sequence + wrappedSocket.write(bb); + } + return wrappedSocket; } } @@ -572,4 +584,13 @@ void rebooted() { public final H2ONode read_impl( AutoBuffer ab ) { return intern(H2Okey.read(ab)); } public final AutoBuffer writeJSON_impl(AutoBuffer ab) { return ab.putJSONStr("node",_key.toString()); } public final H2ONode readJSON_impl( AutoBuffer ab ) { throw H2O.fail(); } + + + public SocketChannelFactory getSocketFactory() { + return _socketFactory; + } + + public H2OSecurityManager getSecurityManager() { + return _security; + } } diff --git a/h2o-core/src/main/java/water/H2OSecurityManager.java b/h2o-core/src/main/java/water/H2OSecurityManager.java new file mode 100644 index 000000000000..8c882a6964c5 --- /dev/null +++ b/h2o-core/src/main/java/water/H2OSecurityManager.java @@ -0,0 +1,69 @@ +package water; + +import water.network.SSLContextException; +import water.network.SSLSocketChannelFactory; +import water.util.Log; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + * Takes care of security. + * + * In the long run this class should manage all security aspects of H2O but currently some parts are handled + * in other parts of the codebase. + * + * An instance of this class should be instantiated for each H2O object + * and should follow its lifecycle. + * + * At this stage we support a simple shared secret, handshake based, authentication, which can be turned + * on with the h2o_ssl_enabled parameter. Should the communicating nodes not share a common shared secret + * communication between them will not be possible. While using this parameter the user should *not* enable + * the useUDP parameter, we do not support UDP encryption at this stage and all UDP datagrams will be + * sent in an unencrypted form! + * + * Current state of data encryption: + * + * - HTTP for FlowUI - currently we rely on Jetty's SSL capabilities, authentication can be performed with + * hash login, ldap login or kerberos. The location of secret keys used byt Jetty's SSL server should be + * passed to the jks parameter. + * + * - inter node communication - all TCP based communication is being authenticated and encrypted using SSL + * using JSSE (Java Secure Socket Extension) when then h2o_ssl_enabled parameter is passed. Keystore related + * parameter should also be used as per the documentation. Secure UDP communication through DTLS is not supported + * at this point in time thus useUDP should not be used for SSL enabled clouds. + * + * - in-memory data encryption - currently not supported, using an encrypted drive is recommended + * at least for the swap partition. + * + * - data saved to disk - currently not supported, using an encrypted drive is recommended + * + */ +public class H2OSecurityManager { + + public boolean securityEnabled = false; + private SSLSocketChannelFactory sslSocketChannelFactory; + + H2OSecurityManager() { + try { + if (null != H2O.ARGS.internal_security_conf) { + this.sslSocketChannelFactory = new SSLSocketChannelFactory(); + this.securityEnabled = true; + } + } catch (SSLContextException e) { + Log.err("Node initialized with SSL enabled but failed to create SSLContext. " + + "Node initialization aborted."); + Log.err(e); + H2O.exit(1); + } + } + + public ByteChannel wrapServerChannel(SocketChannel channel) throws IOException { + return sslSocketChannelFactory.wrapServerChannel(channel); + } + + public ByteChannel wrapClientChannel(SocketChannel channel, String host, int port) throws IOException { + return sslSocketChannelFactory.wrapClientChannel(channel, host, port); + } +} diff --git a/h2o-core/src/main/java/water/TCPReceiverThread.java b/h2o-core/src/main/java/water/TCPReceiverThread.java index 675aae4aa1f5..23f22d612373 100644 --- a/h2o-core/src/main/java/water/TCPReceiverThread.java +++ b/h2o-core/src/main/java/water/TCPReceiverThread.java @@ -1,11 +1,14 @@ package water; import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.ByteChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import water.network.SocketChannelFactory; import water.util.Log; import water.util.SB; @@ -19,8 +22,14 @@ public class TCPReceiverThread extends Thread { private ServerSocketChannel SOCK; + private SocketChannelFactory socketChannelFactory; - public TCPReceiverThread(ServerSocketChannel sock) { super("TCP-Accept"); SOCK = sock; } + public TCPReceiverThread( + ServerSocketChannel sock) { + super("TCP-Accept"); + SOCK = sock; + this.socketChannelFactory = H2O.SELF.getSocketFactory(); + } // The Run Method. // Started by main() on a single thread, this code manages reading TCP requests @@ -50,22 +59,30 @@ public void run() { // Block for TCP connection and setup to read from it. SocketChannel sock = SOCK.accept(); ByteBuffer bb = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder()); + ByteChannel wrappedSocket = socketChannelFactory.serverChannel(sock); bb.limit(bb.capacity()); bb.position(0); - while(bb.hasRemaining()) // read first 8 bytes - sock.read(bb); + while(bb.hasRemaining()) { // read first 8 bytes + wrappedSocket.read(bb); + } bb.flip(); int chanType = bb.get(); // 1 - small , 2 - big int port = bb.getChar(); int sentinel = (0xFF) & bb.get(); - if(sentinel != 0xef) - throw H2O.fail("missing eom sentinel when opening new tcp channel"); + if(sentinel != 0xef) { + if(H2O.SELF.getSecurityManager().securityEnabled) { + throw new IOException("Missing EOM sentinel when opening new SSL tcp channel."); + } else { + throw H2O.fail("missing eom sentinel when opening new tcp channel"); + } + } // todo compare against current cloud, refuse the con if no match - H2ONode h2o = H2ONode.intern(sock.socket().getInetAddress(),port); + InetAddress inetAddress = sock.socket().getInetAddress(); + H2ONode h2o = H2ONode.intern(inetAddress,port); // Pass off the TCP connection to a separate reader thread switch( chanType ) { - case 1: new UDP_TCP_ReaderThread(h2o, sock).start(); break; - case 2: new TCPReaderThread(sock,new AutoBuffer(sock)).start(); break; + case 1: new UDP_TCP_ReaderThread(h2o, wrappedSocket).start(); break; + case 2: new TCPReaderThread(wrappedSocket,new AutoBuffer(wrappedSocket, inetAddress), inetAddress).start(); break; default: throw H2O.fail("unexpected channel type " + chanType + ", only know 1 - Small and 2 - Big"); } } catch( java.nio.channels.AsynchronousCloseException ex ) { @@ -82,12 +99,15 @@ public void run() { // A private thread for reading from this open socket. static class TCPReaderThread extends Thread { - public SocketChannel _sock; + public ByteChannel _sock; public AutoBuffer _ab; - public TCPReaderThread(SocketChannel sock, AutoBuffer ab) { + private final InetAddress address; + + public TCPReaderThread(ByteChannel sock, AutoBuffer ab, InetAddress address) { super("TCP-"+ab._h2o+"-"+(ab._h2o._tcp_readers++)); _sock = sock; _ab = ab; + this.address = address; setPriority(MAX_PRIORITY-1); } @@ -119,7 +139,7 @@ public void run() { // Reuse open sockets for the next task try { if( !_sock.isOpen() ) break; - _ab = new AutoBuffer(_sock); + _ab = new AutoBuffer(_sock, address); } catch( Exception e ) { // Exceptions here are *normal*, this is an idle TCP connection and // either the OS can time it out, or the cloud might shutdown. We @@ -136,11 +156,11 @@ public void run() { * byte array which is than passed on to FJQ. Each message is expected to * be MSG_SZ(2B) MSG BODY(MSG_SZ*B) EOM MARKER (1B - 0xef). */ static class UDP_TCP_ReaderThread extends Thread { - private final SocketChannel _chan; + private final ByteChannel _chan; private final ByteBuffer _bb; private final H2ONode _h2o; - public UDP_TCP_ReaderThread(H2ONode h2o, SocketChannel chan) { + public UDP_TCP_ReaderThread(H2ONode h2o, ByteChannel chan) { super("UDP-TCP-READ-" + h2o); _h2o = h2o; _chan = chan; diff --git a/h2o-core/src/main/java/water/network/SSLContextException.java b/h2o-core/src/main/java/water/network/SSLContextException.java new file mode 100644 index 000000000000..e8490a85b717 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLContextException.java @@ -0,0 +1,7 @@ +package water.network; + +public class SSLContextException extends Throwable { + SSLContextException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/h2o-core/src/main/java/water/network/SSLProperties.java b/h2o-core/src/main/java/water/network/SSLProperties.java new file mode 100644 index 000000000000..cf391ef81717 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLProperties.java @@ -0,0 +1,22 @@ +package water.network; + +import java.util.Properties; + +class SSLProperties extends Properties { + + String[] h2o_ssl_enabled_algorithms() { + String algs = getProperty("h2o_ssl_enabled_algorithms"); + if(null != algs) { + return algs.split(","); + } + return null; + } + + String h2o_ssl_protocol() { return getProperty("h2o_ssl_protocol", "TLSv1.2"); } + + String h2o_ssl_jks_internal() { return getProperty("h2o_ssl_jks_internal"); } + String h2o_ssl_jks_password() { return getProperty("h2o_ssl_jks_password"); } + String h2o_ssl_jts() { return getProperty("h2o_ssl_jts") != null ? getProperty("h2o_ssl_jts") : getProperty("h2o_ssl_jks_internal"); } + String h2o_ssl_jts_password() { return getProperty("h2o_ssl_jts_password") != null ? getProperty("h2o_ssl_jts_password") : getProperty("h2o_ssl_jks_password"); } + +} diff --git a/h2o-core/src/main/java/water/network/SSLSocketChannel.java b/h2o-core/src/main/java/water/network/SSLSocketChannel.java new file mode 100644 index 000000000000..0323a16edb34 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLSocketChannel.java @@ -0,0 +1,352 @@ +package water.network; + +import water.H2O; +import water.util.Log; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; +import java.io.IOException; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + * This class is based on: + * Oracle's JSSE guide. + * Oracle's SSLEngine demo. + * + * It's a simple wrapper around SocketChannels which enables SSL/TLS + * communication using {@link javax.net.ssl.SSLEngine}. + */ +class SSLSocketChannel implements ByteChannel { + + // Empty buffer for handshakes + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); + + // Buffer holding encrypted outgoing data + private ByteBuffer netInBuffer; + // Buffer holding encrypted incoming data + private ByteBuffer netOutBuffer; + + // Buffer holding decrypted incoming data + private ByteBuffer peerAppData; + + private SocketChannel channel = null; + private SSLEngine sslEngine = null; + + private boolean closing = false; + private boolean closed = false; + + private boolean handshakeComplete = false; + + SSLSocketChannel(SocketChannel channel, SSLEngine sslEngine) throws IOException { + this.channel = channel; + this.sslEngine = sslEngine; + + sslEngine.setEnableSessionCreation(true); + SSLSession session = sslEngine.getSession(); + prepareBuffers(session); + + handshake(); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() throws IOException { + closing = true; + sslEngine.closeOutbound(); + sslEngine.getSession().invalidate(); + netOutBuffer.clear(); + channel.close(); + closed = true; + } + + private void prepareBuffers(SSLSession session) throws SocketException { + int appBufferSize = session.getApplicationBufferSize(); + // Less is not more. More is more. Bigger than the app buffer size so successful unwraps() don't cause BUFFER_OVERFLOW + // Value 64 was based on other frameworks using it and some manual testing. Might require tuning in the future. + peerAppData = ByteBuffer.allocate(appBufferSize + 64); + + int netBufferSize = session.getPacketBufferSize(); + netInBuffer = ByteBuffer.allocate(netBufferSize); + netOutBuffer = ByteBuffer.allocate(netBufferSize); + + } + + // ----------------------------------------------------------- + // HANDSHAKE + // ----------------------------------------------------------- + + private SSLEngineResult.HandshakeStatus hs; + + private void handshake() throws IOException { + Log.debug("Starting SSL handshake..."); + sslEngine.beginHandshake(); + + hs = sslEngine.getHandshakeStatus(); + SSLEngineResult initHandshakeStatus; + + while (!handshakeComplete) { + switch (hs) { + case NOT_HANDSHAKING: { + //should never happen + throw new IOException("NOT_HANDSHAKING during handshake"); + } + case FINISHED: + handshakeComplete = !netOutBuffer.hasRemaining(); + break; + case NEED_WRAP: { + initHandshakeStatus = handshakeWrap(); + if ( initHandshakeStatus.getStatus() == SSLEngineResult.Status.OK ){ + if (hs == SSLEngineResult.HandshakeStatus.NEED_TASK) { + tasks(); + } + } + break; + } + case NEED_UNWRAP: { + initHandshakeStatus = handshakeUnwrap(); + if ( initHandshakeStatus.getStatus() == SSLEngineResult.Status.OK ){ + if (hs == SSLEngineResult.HandshakeStatus.NEED_TASK) { + tasks(); + } + } + break; + } + // SSL needs to perform some delegating tasks before it can continue. + // Those tasks will be run in the same thread and can be blocking. + case NEED_TASK: + tasks(); + break; + } + } + Log.debug("SSL handshake finished successfully!"); + } + + private synchronized SSLEngineResult handshakeWrap() throws IOException { + netOutBuffer.clear(); + SSLEngineResult wrapResult = sslEngine.wrap(EMPTY_BUFFER, netOutBuffer); + netOutBuffer.flip(); + hs = wrapResult.getHandshakeStatus(); + channel.write(netOutBuffer); + return wrapResult; + } + + private synchronized SSLEngineResult handshakeUnwrap() throws IOException { + if (netInBuffer.position() == netInBuffer.limit()) { + netInBuffer.clear(); + } + + channel.read(netInBuffer); + + SSLEngineResult unwrapResult; + peerAppData.clear(); + + do { + netInBuffer.flip(); + + unwrapResult = sslEngine.unwrap(netInBuffer, peerAppData); + + netInBuffer.compact(); + + hs = unwrapResult.getHandshakeStatus(); + + switch (unwrapResult.getStatus()) { + case OK: + case BUFFER_UNDERFLOW: { + if (unwrapResult.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) { + tasks(); + } + break; + } + case BUFFER_OVERFLOW: { + int applicationBufferSize = sslEngine.getSession().getApplicationBufferSize(); + if (applicationBufferSize > peerAppData.capacity()) { + ByteBuffer b = ByteBuffer.allocate(applicationBufferSize + peerAppData.position()); + peerAppData.flip(); + b.put(peerAppData); + peerAppData = b; + } else { + peerAppData.compact(); + } + break; + } + default: + throw new IOException("Failed to SSL unwrap with status " + unwrapResult.getStatus()); + } + } while(unwrapResult.getStatus() == SSLEngineResult.Status.OK && + hs == SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + + return unwrapResult; + } + + // ----------------------------------------------------------- + // READ AND WRITE + // ----------------------------------------------------------- + + @Override + public int read(ByteBuffer dst) throws IOException { + if (closing || closed) return -1; + + return unwrap(dst); + } + + private synchronized int unwrap(ByteBuffer dst) throws IOException { + int read = 0; + // We have outstanding data in our incoming decrypted buffer, use that data first to fill dst + if(!dst.hasRemaining()) { + return 0; + } + + if(peerAppData.position() != 0) { + read += copy(peerAppData, dst); + return read; + } + + if(netInBuffer.position() == 0) { + channel.read(netInBuffer); + } + + while(netInBuffer.position() != 0) { + netInBuffer.flip(); + + // We still might have left data here if dst was smaller than the amount of data in peerAppData + if(peerAppData.position() != 0) { + peerAppData.compact(); + } + + SSLEngineResult unwrapResult = sslEngine.unwrap(netInBuffer, peerAppData); + + switch (unwrapResult.getStatus()) { + case OK: { + unwrapResult.bytesProduced(); + if (unwrapResult.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) tasks(); + break; + } + case BUFFER_OVERFLOW: { + int applicationBufferSize = sslEngine.getSession().getApplicationBufferSize(); + if (applicationBufferSize > peerAppData.capacity()) { + int appSize = applicationBufferSize; + ByteBuffer b = ByteBuffer.allocate(appSize + peerAppData.position()); + peerAppData.flip(); + b.put(peerAppData); + peerAppData = b; + } else { + // We tried to unwrap data into peerAppData which means there's leftover in netInBuffer + // the upcoming read should read int potential new data after the leftover + netInBuffer.position(netInBuffer.limit()); + netInBuffer.limit(netInBuffer.capacity()); + peerAppData.compact(); + if(!dst.hasRemaining()) { + return read; + } + } + break; + } + case BUFFER_UNDERFLOW: { + int packetBufferSize = sslEngine.getSession().getPacketBufferSize(); + if (packetBufferSize > netInBuffer.capacity()) { + int netSize = packetBufferSize; + if (netSize > netInBuffer.capacity()) { + ByteBuffer b = ByteBuffer.allocate(netSize); + netInBuffer.flip(); + b.put(netInBuffer); + netInBuffer = b; + } + } else { + // We have some leftover data from unwrap but no enough. + // We need to read in more data from the socket AFTER the current data. + netInBuffer.position(netInBuffer.limit()); + netInBuffer.limit(netInBuffer.capacity()); + channel.read(netInBuffer); + continue; + } + break; + } + default: + throw new IOException("Failed to SSL unwrap with status " + unwrapResult.getStatus()); + } + + if (peerAppData != dst && dst.hasRemaining()) { + peerAppData.flip(); + read += copy(peerAppData, dst); + if(!dst.hasRemaining()) { + netInBuffer.compact(); + return read; + } + } + + netInBuffer.compact(); + } + return read; + } + + private int copy(ByteBuffer src, ByteBuffer dst) { + int toCopy = Math.min(src.remaining(), dst.remaining()); + + dst.put(src.array(), src.position(), toCopy); + src.position(src.position() + toCopy); + + if(!src.hasRemaining()) { + src.clear(); + } + return toCopy; + } + + @Override + public int write(ByteBuffer src) throws IOException { + if(closing || closed) { + throw new IOException("Cannot perform socket write, the socket is closed (or being closed)."); + } + + int wrote = 0; + // src can be much bigger than what our SSL session allows to send in one go + while (src.hasRemaining()) { + netOutBuffer.clear(); + + SSLEngineResult wrapResult = sslEngine.wrap(src, netOutBuffer); + netOutBuffer.flip(); + + if (wrapResult.getStatus() == SSLEngineResult.Status.OK) { + if (wrapResult.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) tasks(); + } + + while (netOutBuffer.hasRemaining()) { + wrote += channel.write(netOutBuffer); + } + } + + return wrote; + } + + // ----------------------------------------------------------- + // MISC + // ----------------------------------------------------------- + + private void tasks() { + Runnable r; + while ( (r = sslEngine.getDelegatedTask()) != null) { + r.run(); + } + hs = sslEngine.getHandshakeStatus(); + } + + public SocketChannel channel() { + return channel; + } + + SSLEngine getEngine() { + return sslEngine; + } + + boolean isHandshakeComplete() { + return handshakeComplete; + } +} diff --git a/h2o-core/src/main/java/water/network/SSLSocketChannelFactory.java b/h2o-core/src/main/java/water/network/SSLSocketChannelFactory.java new file mode 100644 index 000000000000..da9543a272eb --- /dev/null +++ b/h2o-core/src/main/java/water/network/SSLSocketChannelFactory.java @@ -0,0 +1,114 @@ +package water.network; + +import water.H2O; +import water.util.Log; + +import javax.net.ssl.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; +import java.security.*; +import java.security.cert.CertificateException; + +public class SSLSocketChannelFactory { + + private SSLContext sslContext = null; + private SSLProperties properties = null; + + public SSLSocketChannelFactory() throws SSLContextException { + try { + SSLProperties props = new SSLProperties(); + props.load(new FileInputStream(H2O.ARGS.internal_security_conf)); + init(props); + } catch (IOException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } + } + + public SSLSocketChannelFactory(SSLProperties props) throws SSLContextException { + init(props); + } + + private void init(SSLProperties props) throws SSLContextException { + properties = props; + try { + if (requiredParamsPresent()) { + this.sslContext = SSLContext.getInstance(properties.h2o_ssl_protocol()); + this.sslContext.init(keyManager(), trustManager(), null); + } else { + this.sslContext = SSLContext.getDefault(); + } + } catch (NoSuchAlgorithmException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (IOException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (UnrecoverableKeyException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (KeyStoreException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (KeyManagementException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } catch (CertificateException e) { + Log.err("Failed to initialized SSL context.", e); + throw new SSLContextException("Failed to initialized SSL context.", e); + } + } + + + private boolean requiredParamsPresent() { + return null != properties.h2o_ssl_jks_internal() && + null != properties.h2o_ssl_jks_password(); + } + + private TrustManager[] trustManager() throws + KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException { + KeyStore ksTrust = KeyStore.getInstance("JKS"); + + ksTrust.load( + new FileInputStream(properties.h2o_ssl_jts()), + properties.h2o_ssl_jts_password().toCharArray() + ); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ksTrust); + return tmf.getTrustManagers(); + } + + private KeyManager[] keyManager() throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException { + KeyStore ksKeys = KeyStore.getInstance("JKS"); + + ksKeys.load(new FileInputStream(properties.h2o_ssl_jks_internal()), + properties.h2o_ssl_jks_password().toCharArray() + ); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ksKeys, properties.h2o_ssl_jks_password().toCharArray()); + return kmf.getKeyManagers(); + } + + public ByteChannel wrapClientChannel( + SocketChannel channel, + String host, + int port) throws IOException { + SSLEngine sslEngine = sslContext.createSSLEngine(host, port); + sslEngine.setUseClientMode(false); + if (null != properties.h2o_ssl_enabled_algorithms()) { + sslEngine.setEnabledCipherSuites(properties.h2o_ssl_enabled_algorithms()); + } + return new SSLSocketChannel(channel, sslEngine); + } + + public ByteChannel wrapServerChannel(SocketChannel channel) throws IOException { + SSLEngine sslEngine = sslContext.createSSLEngine(); + sslEngine.setUseClientMode(true); + if (null != properties.h2o_ssl_enabled_algorithms()) { + sslEngine.setEnabledCipherSuites(properties.h2o_ssl_enabled_algorithms()); + } + return new SSLSocketChannel(channel, sslEngine); + } +} diff --git a/h2o-core/src/main/java/water/network/SecurityUtils.java b/h2o-core/src/main/java/water/network/SecurityUtils.java new file mode 100644 index 000000000000..767fd7407e08 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SecurityUtils.java @@ -0,0 +1,124 @@ +package water.network; + +import water.util.Log; + +import java.io.*; +import java.security.*; +import java.util.Properties; + +public class SecurityUtils { + + private static SecureRandom RANDOM = new SecureRandom(); + private static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + + private final static String[] keyToolCandidates = new String[]{ + "sun.security.tools.KeyTool", // Java6 + "sun.security.tools.keytool.Main", // Java7+ + "com.ibm.crypto.tools.KeyTool" // IBM Java + }; + + private static StoreCredentials generateKeystore(String password) throws Exception { + return generateKeystore(password, "h2o-internal.jks", ""); + } + + private static StoreCredentials generateKeystore(String password, String name, String location) throws Exception { + String path = null != location && !location.isEmpty() ? location + File.pathSeparator + name : name; + if(new File(path).exists()) { + throw new IllegalStateException("A file under the location " + path + " already exists. Please delete it first."); + } + + String[] genKeyArgs = new String[]{ + "-genkeypair", + "-alias", "h2o-internal", + "-keyalg", "RSA", + "-sigalg", "SHA256withRSA", + "-dname", "CN=Java", + "-storetype", "JKS", + "-keypass", password, + "-keystore", path, + "-storepass", password, + "-validity", "3650" + }; + + Class keytool = getKeyToolClass(); + + keytool.getMethod("main", String[].class).invoke(null, (Object) genKeyArgs); + + return new StoreCredentials(name, location, password); + } + + private static Class getKeyToolClass() { + for (String keyToolCandidate : keyToolCandidates) { + try { + return Class.forName(keyToolCandidate); + } catch (Exception e) { + // Ignore, try other candidates + } + } + + // Unsuported JRE/JDK + String errorMsg = "This version of Java is not supported. Please use Oracle/OpenJDK/IBM JDK version 6/7/8."; + Log.err(errorMsg); + throw new IllegalStateException(errorMsg); + } + + public static SSLCredentials generateSSLPair(String passwd, String name, String location) throws Exception { + StoreCredentials jks = generateKeystore(passwd, name, location); + return new SSLCredentials(jks, jks); + } + + public static SSLCredentials generateSSLPair() throws Exception { + StoreCredentials jks = generateKeystore(passwordGenerator(16)); + return new SSLCredentials(jks, jks); + } + + private static String passwordGenerator(int len) { + StringBuilder sb = new StringBuilder(len); + for (int i = 0; i < len; i++) { + sb.append(AB.charAt(RANDOM.nextInt(AB.length()))); + } + return sb.toString(); + } + + public static String generateSSLConfig(SSLCredentials credentials) throws IOException { + return generateSSLConfig(credentials, "ssl.properties"); + } + + public static String generateSSLConfig(SSLCredentials credentials, String file) throws IOException { + Properties sslConfig = new Properties(); + sslConfig.put("h2o_ssl_protocol", "TLSv1.2"); + sslConfig.put("h2o_ssl_jks_internal", credentials.jks.getLocation()); + sslConfig.put("h2o_ssl_jks_password", credentials.jks.pass); + sslConfig.put("h2o_ssl_jts", credentials.jts.getLocation()); + sslConfig.put("h2o_ssl_jts_password", credentials.jts.pass); + FileOutputStream output = new FileOutputStream(file); + sslConfig.store(output, ""); + return file; + } + + public static class StoreCredentials { + public String name = null; + public String path = null; + public String pass = null; + + StoreCredentials(String name, String path, String pass) { + this.name = name; + this.path = path; + this.pass = pass; + } + + public String getLocation() { + return null != path && !path.isEmpty() ? path + File.pathSeparator + name : name; + } + } + + public static class SSLCredentials { + public StoreCredentials jks; + public StoreCredentials jts; + + SSLCredentials(StoreCredentials jks, StoreCredentials jts) { + this.jks = jks; + this.jts = jts; + } + } +} diff --git a/h2o-core/src/main/java/water/network/SocketChannelFactory.java b/h2o-core/src/main/java/water/network/SocketChannelFactory.java new file mode 100644 index 000000000000..58bbf02f2a70 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SocketChannelFactory.java @@ -0,0 +1,38 @@ +package water.network; + +import water.H2OSecurityManager; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +/** + * Creates either a raw or an SSL/TLS wrapped socket depending on + * the node's configuration. All sockets used in the application should be + * created using this class. + */ +public class SocketChannelFactory { + + private H2OSecurityManager sm; + + public SocketChannelFactory(H2OSecurityManager sm) { + this.sm = sm; + } + + public ByteChannel serverChannel(ByteChannel channel) throws IOException { + if(sm.securityEnabled && !(channel instanceof SSLSocketChannel)) { + return sm.wrapServerChannel((SocketChannel)channel); + } else { + return channel; + } + } + + public ByteChannel clientChannel(ByteChannel channel, String host, int port) throws IOException { + if(sm.securityEnabled && !(channel instanceof SSLSocketChannel)) { + return sm.wrapClientChannel((SocketChannel)channel, host, port); + } else { + return channel; + } + } + +} diff --git a/h2o-core/src/main/java/water/network/SocketChannelUtils.java b/h2o-core/src/main/java/water/network/SocketChannelUtils.java new file mode 100644 index 000000000000..efbaff649083 --- /dev/null +++ b/h2o-core/src/main/java/water/network/SocketChannelUtils.java @@ -0,0 +1,23 @@ +package water.network; + +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +public class SocketChannelUtils { + + public static boolean isSocketChannel(ByteChannel channel) { + return channel instanceof SocketChannel || channel instanceof SSLSocketChannel; + } + + public static SocketChannel underlyingSocketChannel(ByteChannel channel) { + if(channel instanceof SSLSocketChannel) { + return ((SSLSocketChannel) channel).channel(); + } else if(channel instanceof SocketChannel) { + return (SocketChannel) channel; + } + throw new UnsupportedOperationException( + "Channel is not a socket channel. Cannot retrieve the underlying channel." + ); + } + +} diff --git a/h2o-core/src/test/java/water/TestUtil.java b/h2o-core/src/test/java/water/TestUtil.java index 100726a5e8a9..752c5c0d6628 100644 --- a/h2o-core/src/test/java/water/TestUtil.java +++ b/h2o-core/src/test/java/water/TestUtil.java @@ -161,7 +161,7 @@ class TimerStatement extends Statement { /** Hunt for test files in likely places. Null if cannot find. * @param fname Test filename * @return Found file or null */ - protected static File find_test_file_static(String fname) { + public static File find_test_file_static(String fname) { // When run from eclipse, the working directory is different. // Try pointing at another likely place File file = new File(fname); diff --git a/h2o-core/src/test/java/water/network/SSLSocketChannelFactoryTest.java b/h2o-core/src/test/java/water/network/SSLSocketChannelFactoryTest.java new file mode 100644 index 000000000000..5b16b0018cf9 --- /dev/null +++ b/h2o-core/src/test/java/water/network/SSLSocketChannelFactoryTest.java @@ -0,0 +1,220 @@ +package water.network; + +import org.junit.Test; +import water.TestUtil; +import water.util.Log; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.*; + +public class SSLSocketChannelFactoryTest { + + private int port = 9999; + + @Test + public void shouldHandshake() throws IOException, SSLContextException, BrokenBarrierException, InterruptedException { + SSLProperties props = new SSLProperties(); + props.put("h2o_ssl_protocol", "TLSv1.2"); + props.put("h2o_ssl_jks_internal", TestUtil.find_test_file_static("src/test/resources/keystore.jks").getPath()); + props.put("h2o_ssl_jks_password", "password"); + props.put("h2o_ssl_jts", TestUtil.find_test_file_static("src/test/resources/cacerts.jks").getPath()); + props.put("h2o_ssl_jts_password", "password"); + + final SSLSocketChannelFactory factory = new SSLSocketChannelFactory(props); + + final CyclicBarrier barrier = new CyclicBarrier(2); + final CyclicBarrier testOne = new CyclicBarrier(2); + final CyclicBarrier testTwo = new CyclicBarrier(2); + final CyclicBarrier testThree = new CyclicBarrier(2); + + final boolean[] hs = new boolean[]{true}; + + Thread client = new ClientThread(factory, testOne, testTwo, testThree, barrier); + client.setDaemon(false); + client.start(); + + try { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.socket().setReceiveBufferSize(64 * 1024); + while(true) { + try { + serverSocketChannel.socket().bind(new InetSocketAddress(port)); + break; + } catch (BindException e) { + port++; + } + } + barrier.await(); + SocketChannel sock = serverSocketChannel.accept(); + barrier.reset(); + + SSLSocketChannel wrappedChannel = (SSLSocketChannel) factory.wrapServerChannel(sock); + + assertTrue(wrappedChannel.isHandshakeComplete()); + + // FIRST TEST: SSL -> SSL SMALL COMMUNICATION + ByteBuffer readBuffer = ByteBuffer.allocate(12); + + while (readBuffer.hasRemaining()) { + wrappedChannel.read(readBuffer); + } + + readBuffer.flip(); + + byte[] dst = new byte[12]; + readBuffer.get(dst, 0, 12); + readBuffer.clear(); + + assertEquals("hello, world", new String(dst, StandardCharsets.UTF_8)); + testOne.await(); + + // SECOND TEST: SSL -> SSL BIG COMMUNICATION + int read = 0; + byte[] dstBig = new byte[16]; + ByteBuffer readBufferBig = ByteBuffer.allocate(1024); + while (read < 5 * 64 * 1024) { + while (readBufferBig.position() < 16) { + wrappedChannel.read(readBufferBig); + } + + readBufferBig.flip(); + readBufferBig.get(dstBig, 0, 16); + if (!readBufferBig.hasRemaining()) { + readBufferBig.clear(); + } else { + readBufferBig.compact(); + } + assertEquals("hello, world" + (read % 9) + "!!!", new String(dstBig, StandardCharsets.UTF_8)); + read += 16; + } + + testTwo.await(); + + // THIRD TEST: NON-SSL -> SSL COMMUNICATION + try { + while (readBuffer.hasRemaining()) { + wrappedChannel.read(readBuffer); + } + fail(); + } catch (SSLException e) { + // PASSED + } + + assertTrue(wrappedChannel.getEngine().isInboundDone()); + + testThree.await(); + + // FOURTH TEST: SSL -> NON-SSL COMMUNICATION + readBuffer.clear(); + while (readBuffer.hasRemaining()) { + sock.read(readBuffer); + } + + readBuffer.flip(); + readBuffer.get(dst, 0, 12); + readBuffer.clear(); + + assertNotEquals("hello, world", new String(dst, StandardCharsets.UTF_8)); + } catch (IOException | InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + + barrier.await(); + assertTrue("One of the handshakes failed!", hs[0]); + } + + + private class ClientThread extends Thread { + private final SSLSocketChannelFactory factory; + private final CyclicBarrier testOne; + private final CyclicBarrier testTwo; + private final CyclicBarrier testThree; + private final CyclicBarrier barrier; + + public ClientThread(SSLSocketChannelFactory factory, + CyclicBarrier testOne, + CyclicBarrier testTwo, + CyclicBarrier testThree, + CyclicBarrier barrier) { + this.factory = factory; + this.testOne = testOne; + this.testTwo = testTwo; + this.testThree = testThree; + this.barrier = barrier; + } + + @Override + public void run() { + try { + barrier.await(); + SocketChannel sock = SocketChannel.open(); + sock.socket().setReuseAddress(true); + sock.socket().setSendBufferSize(64 * 1024); + InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port); + sock.connect(isa); + sock.configureBlocking(true); + sock.socket().setTcpNoDelay(true); + + SSLSocketChannel wrappedChannel = (SSLSocketChannel) factory.wrapClientChannel(sock, "127.0.0.1", port); + + // FIRST TEST: SSL -> SSL SMALL COMMUNICATION + ByteBuffer write = ByteBuffer.allocate(1024); + write.put("hello, world".getBytes(StandardCharsets.UTF_8)); + write.flip(); + wrappedChannel.write(write); + + testOne.await(); + + // SECOND TEST: SSL -> SSL BIG COMMUNICATION + ByteBuffer toWriteBig = ByteBuffer.allocate(64 * 1024); + for (int i = 0; i < 5; i++) { + toWriteBig.clear(); + while (toWriteBig.hasRemaining()) { + toWriteBig.put( + ("hello, world" + ((i * 64 * 1024 + toWriteBig.position()) % 9) + "!!!") + .getBytes(StandardCharsets.UTF_8) + ); + } + toWriteBig.flip(); + wrappedChannel.write(toWriteBig); + } + + testTwo.await(); + + // THIRD TEST: NON-SSL -> SSL COMMUNICATION + write.clear(); + write.put("hello, world".getBytes(StandardCharsets.UTF_8)); + write.flip(); + sock.write(write); + + testThree.await(); + + // FOURTH TEST: SSL -> NON-SSL COMMUNICATION + write.clear(); + write.put("hello, world".getBytes(StandardCharsets.UTF_8)); + write.flip(); + wrappedChannel.write(write); + + } catch (IOException | InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } finally { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + } + +} \ No newline at end of file diff --git a/h2o-core/src/test/java/water/network/SecurityUtilsTest.java b/h2o-core/src/test/java/water/network/SecurityUtilsTest.java new file mode 100644 index 000000000000..60112251cddb --- /dev/null +++ b/h2o-core/src/test/java/water/network/SecurityUtilsTest.java @@ -0,0 +1,41 @@ +package water.network; + +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Properties; + +import static org.junit.Assert.*; + +public class SecurityUtilsTest { + + @Test + public void shouldGenerateKeystoreAndConfig() throws Exception { + try { + SecurityUtils.SSLCredentials testKeystore = SecurityUtils.generateSSLPair("test123", "h2o-keystore-test.jks", ""); + String configPath = SecurityUtils.generateSSLConfig(testKeystore, "test-ssl.properties"); + + assertTrue(new File(testKeystore.jks.getLocation()).exists()); + + Properties sslConfig = new Properties(); + sslConfig.load(new FileInputStream(configPath)); + assertEquals("TLSv1.2", sslConfig.getProperty("h2o_ssl_protocol")); + assertEquals("h2o-keystore-test.jks", sslConfig.getProperty("h2o_ssl_jks_internal")); + assertEquals("test123", sslConfig.getProperty("h2o_ssl_jks_password")); + assertEquals("h2o-keystore-test.jks", sslConfig.getProperty("h2o_ssl_jts")); + assertEquals("test123", sslConfig.getProperty("h2o_ssl_jts_password")); + } finally { + File keystore = new File("h2o-keystore-test.jks"); + if(keystore.exists()) { + keystore.deleteOnExit(); + } + + File props = new File("test-ssl.properties"); + if(props.exists()) { + props.deleteOnExit(); + } + } + } + +} \ No newline at end of file diff --git a/h2o-core/src/test/resources/cacerts.jks b/h2o-core/src/test/resources/cacerts.jks new file mode 100644 index 0000000000000000000000000000000000000000..cfbbf485f328f1ed1b7450e806a1536492f2adea GIT binary patch literal 956 zcmezO_TO6u1_mY|W(3n5xs@sTxrv#1K!MO={;lZ@tPy&q29^vA%+&@>%;g46Oz{ht znHZUvSc1iaP8;yDacZ@Bw0-AgWaMULFvu|!FyLcj4rO5zW)IEF&dV>)gNblph_GXb zaAAmWV~8-rMGWM`c@51B%!~{TjZA@L6qsviXbI)g*)>g!O31;%$jZRn#K_NJ(8S2a z)WpchFmK;SC+&}WU;ex@_uhnDEcJIq>anI0f$51Cb*`Uu=@9dotzT_mty7r4oJjq;lc5b$? zSNG~`dlzP##Hk|2y%$LEKKk|BQsa!1^LP{Wf;81EgxkAb zPGZZwSXN^DIjZU5v*#yNydPaJEdJIuyG~-|VeQ*2EH5G&tXc&gG<^BO@RDWg>{+dI zxi3oJ6_B4E^YB~d|4Vr;AYelUZ z&u;G*>z%rF;=`h89r~x4m>C%u7b_ab8^{7vh%6tA7>fw!7ZZji;l+#{U*`T!vN^Sv zvr%{zav%cJ6fh7O8DbV+IHqrX^WAaLXib6j;W}=a2C0H^$+_$gB7Po87A`tzd(He> z{A4kX6g5+)Rj)huKiRe9@KL58kDkYz$Xm6PqwkIXqHL#=H&uxU)_}H(K>?}2T^FDIn`Ohn=7QEX2M0o!!=cUTd^Ovp5ontI3)vLif zlT-H#=YE#bUFlDC{<|tq6Ayh{SN-W;{@nfNXG^rPS&Dab#`iyqoigqGjFpep)vs%t z!J*0=yRoGsJX_c(v}4nH^&Rq$_HZnHn;vm4*fe&9@m7PTiYFTcu@8IoU(Ff$y}8E*>QHxp!XLyJjNyi1=e#oSW z%-D^1w=0f#&bT=8rV*c1h<%qTJ z`1qM)0;%EER|a39-qhHbftN2K8(;aaR!H^Rx{YL7yy(#VMUnuoA0KVjJ+;#D_!@2h z2`t)N&Le%)r(*K1;SCjFWJ=M>zox5)-Q}}aObEd@SMRdsqI1`$EGNFoav*+Nwi^n| zv03ujr+~PQO|W8o2uVoha`WvK0iW`7R(TxOT!xF7W0SgkS_-P=ifY+NW*mGU_}iBF zn40!4a{UgkErS|VUUuJMn;B}G80gLt)z)rRHK_R9-|Qw+0ZRlS$MbSWb1q96m0D9~ zUQtuS1(SqeXO>CYtD(S$w&QaAEFnRGiIj@7KQ#13 zoCXd`t{d7}Tgf~uQqHl7sC*)#B6oYLFMVzBq+S(2&9tS?1K!P>6Ls{x32C(McA3) z`Mvt08(G$^N7L?oO7|F~RV9(SJ`=vEr4fl#hpI+Q&2vhvrJ`+o`#hB8b>kMXJ5COW z!BqM`KA@pPxBOdiM6r>>SL7)yHZP;_C&Kp=;s1$%|3{Frk^&mqNNtpc1`2&LZ~R#GH1z(n{-3WQ0~P*i$H}(nqf5nzMojhh)8Hk<+F&0~F%QF$dMD$= znQ|(lua4c?_~BXvZoz4Swv;4kfe$doV=v3tZAYyeV%^z+?yhFZ;-_c*DhF}{V3tYG zM_UR5v|P&Y`=TymOM4CWI|K`*>zBUV+z@uR>e|^Y)%0cM3H+*Nhfvmq-Mhc^kee`c zn`p3ogFG|8(0k76^H^y3E+sqm^s|l&6A;LvV;m+ycsh>F1}s5dWM?Jh@(-Q;C=7q> zGSlsw)I7txHt@~(si=f%OnJjl?Q(C4t2!jO3?GozvsjI=)n|xfNCWlB614OesWV~p zMD>0U7zh9s%Ol}PnUjRb2tp(vV!UiE02x*cOl0RCdg%AH@RDHVe>L$WQzs?@0WQUZ z-RhSVJw4Dol&<(5>PK?G`4(&+BVx*($NLl z|GePR8&qBjeXwd%6o6&D_b39aLWy<{lT^QV`G2^NA~@DK3;8YR!zyrwu z!nfOHQ3cEOv#_=-lTt;K{3n%!98H-sG!=nNUR5@48ziF9XYS&6Gex>2{!481)~Ddy zwzt`*DcpLJiBz}bdDk@h+l*(QEB{$Z$$%<>U2EbK9Rgq|d}3|Y-x}bbo1vv^K8|m$ zpqa#v Q|1K}c${bSZANzjzCxx8o!T $OUTDIR/out.1 2>&1 & PID_1=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.2 2>&1 & PID_2=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.3 2>&1 & PID_3=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.4 2>&1 & PID_4=$! +runCluster # If coverage is being run, then pass a system variable flag so that timeout limits are increased. if [ $JACOCO_ENABLED = true ] @@ -130,7 +128,7 @@ fi # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-core junit tests... -($JVM -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 +($JVM $TEST_SSL -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER $JUNIT_TESTS_BOOT `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 grep EXECUTION $OUTDIR/out.0 | sed -e "s/.*TEST \(.*\) EXECUTION TIME: \(.*\) (Wall.*/\2 \1/" | sort -gr | head -n 10 >> $OUTDIR/out.0 diff --git a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java index fa8006972b85..2247b542e1c9 100644 --- a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java +++ b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2odriver.java @@ -10,6 +10,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import water.network.SecurityUtils; import java.io.*; import java.net.*; @@ -83,6 +84,8 @@ public class h2odriver extends Configured implements Tool { static ArrayList extraJvmArguments = new ArrayList(); static String jksFileName = null; static String jksPass = null; + static String securityConf = null; + static boolean internal_secure_connections = false; static boolean hashLogin = false; static boolean ldapLogin = false; static boolean kerberosLogin = false; @@ -783,6 +786,13 @@ else if (s.equals("-jks_pass")) { i++; if (i >= args.length) { usage(); } jksPass = args[i]; } + else if (s.equals("-internal_secure_connections")) { + internal_secure_connections = true; + } + else if (s.equals("-internal_security")) { + i++; if (i >= args.length) { usage(); } + securityConf = args[i]; + } else if (s.equals("-hash_login")) { hashLogin = true; } @@ -1072,7 +1082,7 @@ private int run2(String[] args) throws Exception { // Parse arguments. // ---------------- - parseArgs (args); + parseArgs(args); validateArgs(); // Set up callback address and port. @@ -1117,7 +1127,7 @@ private int run2(String[] args) throws Exception { // YARN container must be sized greater than Xmx. // YARN will kill the application if the RSS of the process is larger than // mapreduce.map.memory.mb. - long jvmInternalMemoryMegabytes = (long) ((double)megabytes * ((double)extraMemPercent)/100.0); + long jvmInternalMemoryMegabytes = (long) ((double) megabytes * ((double) extraMemPercent) / 100.0); processTotalPhysicalMemoryMegabytes = megabytes + jvmInternalMemoryMegabytes; conf.set("mapreduce.job.ubertask.enable", "false"); String mapreduceMapMemoryMb = Long.toString(processTotalPhysicalMemoryMegabytes); @@ -1136,8 +1146,7 @@ private int run2(String[] args) throws Exception { .append((enablePrintCompilation ? " -XX:+PrintCompilation" : "")) .append((enableExcludeMethods ? " -XX:CompileCommand=exclude,water/fvec/NewChunk.append2slowd" : "")) .append((enableLog4jDefaultInitOverride ? " -Dlog4j.defaultInitOverride=true" : "")) - .append((enableDebug ? " -agentlib:jdwp=transport=dt_socket,server=y,suspend=" + (enableSuspend ? "y" : "n") + ",address=" + debugPort : "")) - ; + .append((enableDebug ? " -agentlib:jdwp=transport=dt_socket,server=y,suspend=" + (enableSuspend ? "y" : "n") + ",address=" + debugPort : "")); for (String s : extraJvmArguments) { sb.append(" ").append(s); } @@ -1145,7 +1154,7 @@ private int run2(String[] args) throws Exception { String mapChildJavaOpts = sb.toString(); conf.set("mapreduce.map.java.opts", mapChildJavaOpts); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.child.java.opts", mapChildJavaOpts); conf.set("mapred.map.child.java.opts", mapChildJavaOpts); // MapR 2.x requires this. } @@ -1157,22 +1166,22 @@ private int run2(String[] args) throws Exception { } conf.set("mapreduce.client.genericoptionsparser.used", "true"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.used.genericoptionsparser", "true"); } conf.set("mapreduce.map.speculative", "false"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.map.tasks.speculative.execution", "false"); } conf.set("mapreduce.map.maxattempts", "1"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.map.max.attempts", "1"); } conf.set("mapreduce.job.jvm.numtasks", "1"); - if (! usingYarn()) { + if (!usingYarn()) { conf.set("mapred.job.reuse.jvm.num.tasks", "1"); } @@ -1199,7 +1208,7 @@ private int run2(String[] args) throws Exception { if (flowDir != null) { addMapperArg(conf, "-flow_dir", flowDir); } - if((new File(".h2o_no_collect")).exists() || (new File(System.getProperty("user.home")+"/.h2o_no_collect")).exists()) { + if ((new File(".h2o_no_collect")).exists() || (new File(System.getProperty("user.home") + "/.h2o_no_collect")).exists()) { addMapperArg(conf, "-ga_opt_out"); } String hadoopVersion = calcHadoopVersion(); @@ -1240,6 +1249,16 @@ private int run2(String[] args) throws Exception { addMapperConf(conf, "-login_conf", "login.conf", krbConfData); } + // SSL + if (null != securityConf && !securityConf.isEmpty()) { + addMapperConf(conf, "-internal_security_conf", "security.config", securityConf); + } else if(internal_secure_connections) { + SecurityUtils.SSLCredentials credentials = SecurityUtils.generateSSLPair(); + String sslConfigFile = SecurityUtils.generateSSLConfig(credentials); + addMapperConf(conf, "", credentials.jks.name, credentials.jks.getLocation()); + addMapperConf(conf, "-internal_security_conf", "default-security.config", sslConfigFile); + } + conf.set(h2omapper.H2O_MAPPER_CONF_LENGTH, Integer.toString(mapperConfLength)); // Set up job stuff. diff --git a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java index c49568c165ce..770bdb5e990e 100644 --- a/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java +++ b/h2o-hadoop/h2o-mapreduce-generic/src/main/java/water/hadoop/h2omapper.java @@ -4,6 +4,7 @@ import java.net.*; import java.util.List; import java.util.ArrayList; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -273,7 +274,10 @@ private int run2(Context context) throws IOException, InterruptedException { int confLength = Integer.parseInt(conf.get(H2O_MAPPER_CONF_LENGTH)); for (int i = 0; i < confLength; i++) { String arg = conf.get(H2O_MAPPER_CONF_ARG_BASE + Integer.toString(i)); - argsList.add(arg); + // For files which are not passed as args (i.e. SSL certs) + if(null != arg && !arg.isEmpty()) { + argsList.add(arg); + } String basename = conf.get(H2O_MAPPER_CONF_BASENAME_BASE + Integer.toString(i)); File f = new File(ice_root); @@ -287,11 +291,17 @@ private int run2(Context context) throws IOException, InterruptedException { Log.POST(104, "after mkdirs()"); } String fileName = ice_root + File.separator + basename; - FileOutputStream out = new FileOutputStream(fileName); String payload = conf.get(H2O_MAPPER_CONF_PAYLOAD_BASE + Integer.toString(i)); byte[] byteArr = h2odriver.convertStringToByteArr(payload); h2odriver.writeBinaryFile(fileName, byteArr); - argsList.add(fileName); + if(null != arg && !arg.isEmpty()) { + argsList.add(fileName); + } + + // Need to modify this config here as we don't know the destination dir for keys when generating it + if("default-security.config".equals(basename)) { + modifyKeyPath(fileName, ice_root); + } } String[] args = argsList.toArray(new String[argsList.size()]); @@ -334,6 +344,47 @@ private int run2(Context context) throws IOException, InterruptedException { return exitStatus; } + //============================================================================== + // SSL RELATED METHODS + //============================================================================== + private void modifyKeyPath(String fileName, String ice_root) throws IOException { + FileInputStream in = null; + Properties sslProps; + try { + in = new FileInputStream(fileName); + sslProps = new Properties(); + sslProps.load(in); + } finally { + if (in != null) { + in.close(); + } + } + + subPath("h2o_ssl_jks_internal", sslProps, ice_root); + subPath("h2o_ssl_jts", sslProps, ice_root); + + FileOutputStream out = null; + try { + out = new FileOutputStream(fileName); + sslProps.store(out, null); + } finally { + if (out != null) { + out.close(); + } + } + } + + //============================================================================== + //============================================================================== + + private void subPath(String prop, Properties sslProps, String ice_root) { + String path = sslProps.getProperty(prop); + // Change only auto generated path. Don't allow the user to use "h2o-internal.jks" as path + if(null != path && "h2o-internal.jks".equals(path)) { + sslProps.setProperty(prop, ice_root + File.separator + path); + } + } + @Override public void run(Context context) throws IOException, InterruptedException { try { diff --git a/h2o-parsers/h2o-avro-parser/testMultiNode.sh b/h2o-parsers/h2o-avro-parser/testMultiNode.sh index a04687ebd0a4..8a8e4935d1d2 100755 --- a/h2o-parsers/h2o-avro-parser/testMultiNode.sh +++ b/h2o-parsers/h2o-avro-parser/testMultiNode.sh @@ -1,4 +1,5 @@ #!/bin/bash +source ../../multiNodeUtils.sh # Clean out any old sandbox, make a new one OUTDIR=sandbox @@ -80,14 +81,11 @@ echo $DOONLY > $OUTDIR/tests.doonly.txt # Launch 4 helper JVMs. All output redir'd at the OS level to sandbox files. CLUSTER_NAME=junit_cluster_$$ CLUSTER_BASEPORT=44000 -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.1 2>&1 & PID_1=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.2 2>&1 & PID_2=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.3 2>&1 & PID_3=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out 1> $OUTDIR/out.4 2>&1 & PID_4=$! +runCluster # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-avro-parser junit tests... -($JVM -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dbuild.id=$BUILD_ID -Dignore.tests=$IGNORE -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 grep EXECUTION $OUTDIR/out.0 | sed -e "s/.*TEST \(.*\) EXECUTION TIME: \(.*\) (Wall.*/\2 \1/" | sort -gr | head -n 10 >> $OUTDIR/out.0 diff --git a/h2o-scala/testMultiNode.sh b/h2o-scala/testMultiNode.sh index 4c6cadb0331a..510983719212 100755 --- a/h2o-scala/testMultiNode.sh +++ b/h2o-scala/testMultiNode.sh @@ -1,4 +1,5 @@ #!/bin/bash +source ../multiNodeUtils.sh # Argument parsing if [ "$1" = "jacoco" ] @@ -89,10 +90,7 @@ echo $DOONLY > $OUTDIR/tests.doonly.txt # Launch 4 helper JVMs. All output redir'd at the OS level to sandbox files. CLUSTER_NAME=junit_cluster_$$ CLUSTER_BASEPORT=45000 -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.1 2>&1 & PID_1=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.2 2>&1 & PID_2=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.3 2>&1 & PID_3=$! -$JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT --ga_opt_out 1> $OUTDIR/out.4 2>&1 & PID_4=$! +runCluster # If coverage is being run, then pass a system variable flag so that timeout limits are increased. if [ $JACOCO_ENABLED = true ] @@ -104,7 +102,6 @@ fi # Launch last driver JVM. All output redir'd at the OS level to sandbox files. echo Running h2o-scala junit tests... - -($JVM -Ddoonly.tests=$DOONLY -Dignore.tests=$IGNORE -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 +($JVM $TEST_SSL -Ddoonly.tests=$DOONLY -Dignore.tests=$IGNORE -Dbuild.id=$BUILD_ID -Djob.name=$JOB_NAME -Dgit.commit=$GIT_COMMIT -Dgit.branch=$GIT_BRANCH -Dai.h2o.name=$CLUSTER_NAME -Dai.h2o.baseport=$CLUSTER_BASEPORT -Dai.h2o.ga_opt_out=yes $JACOCO_FLAG $JUNIT_RUNNER `cat $OUTDIR/tests.txt` 2>&1 ; echo $? > $OUTDIR/status.0) 1> $OUTDIR/out.0 2>&1 cleanup diff --git a/multiNodeUtils.sh b/multiNodeUtils.sh new file mode 100644 index 000000000000..244a8fe33a8c --- /dev/null +++ b/multiNodeUtils.sh @@ -0,0 +1,20 @@ +#!/bin/bash +SSL="" +TEST_SSL="" +if [[ "$@" == "ssl" ]]; then + if [ ! -f "../h2o-algos/src/test/resources/ssl.properties" ]; then + SSL="-internal_security_conf ../../h2o-algos/src/test/resources/ssl2.properties" + TEST_SSL="-Dai.h2o.internal_security_conf=../../h2o-algos/src/test/resources/ssl2.properties" + else + SSL="-internal_security_conf ../h2o-algos/src/test/resources/ssl.properties" + TEST_SSL="-Dai.h2o.internal_security_conf=../h2o-algos/src/test/resources/ssl.properties" + fi + +fi + +function runCluster () { + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.1 2>&1 & PID_1=$! + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.2 2>&1 & PID_2=$! + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.3 2>&1 & PID_3=$! + $JVM water.H2O -name $CLUSTER_NAME -baseport $CLUSTER_BASEPORT -ga_opt_out $SSL 1> $OUTDIR/out.4 2>&1 & PID_4=$! +} \ No newline at end of file diff --git a/scripts/run.py b/scripts/run.py index 3610ea8b1e0b..c4145cf672d3 100755 --- a/scripts/run.py +++ b/scripts/run.py @@ -262,7 +262,7 @@ class H2OCloudNode(object): """ def __init__(self, is_client, cloud_num, nodes_per_cloud, node_num, cloud_name, h2o_jar, ip, base_port, - xmx, cp, output_dir): + xmx, cp, output_dir, test_ssl): """ Create a node in a cloud. @@ -296,6 +296,8 @@ def __init__(self, is_client, cloud_num, nodes_per_cloud, node_num, cloud_name, self.child = None self.terminated = False + self.test_ssl = test_ssl + # Choose my base port number here. All math is done here. Every node has the same # base_port and calculates it's own my_base_port. ports_per_node = 2 @@ -339,7 +341,6 @@ def start(self): "-baseport", str(self.my_base_port), "-ga_opt_out"] - # If the jacoco flag was included, then modify cmd to generate coverage # data using the jacoco agent if g_jacoco_include: @@ -359,6 +360,12 @@ def start(self): cmd = cmd[:1] + [jacoco] + cmd[1:] + if self.test_ssl: + cmd.append("-internal_security_conf") + if g_convenient: + cmd.append("../h2o-algos/src/test/resources/ssl.properties") + else: + cmd.append("../../../h2o-algos/src/test/resources/ssl3.properties") # Add S3N credentials to cmd if they exist. # ec2_hdfs_config_file_name = os.path.expanduser("~/.ec2/core-site.xml") @@ -515,7 +522,7 @@ class H2OCloud(object): A class representing one of the H2O clouds. """ - def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, xmx, cp, output_dir): + def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, xmx, cp, output_dir, test_ssl): """ Create a cloud. See node definition above for argument descriptions. @@ -530,6 +537,7 @@ def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, x self.xmx = xmx self.cp = cp self.output_dir = output_dir + self.test_ssl = test_ssl # Randomly choose a seven digit cloud number. n = random.randint(1000000, 9999999) @@ -556,7 +564,8 @@ def __init__(self, cloud_num, use_client, nodes_per_cloud, h2o_jar, base_port, x self.cloud_name, self.h2o_jar, "127.0.0.1", self.base_port, - self.xmx, self.cp, self.output_dir) + self.xmx, self.cp, self.output_dir, + self.test_ssl) if is_client: self.client_nodes.append(node) else: @@ -977,7 +986,7 @@ def __init__(self, use_cloud, use_cloud2, use_client, cloud_config, use_ip, use_port, num_clouds, nodes_per_cloud, h2o_jar, base_port, xmx, cp, output_dir, failed_output_dir, path_to_tar, path_to_whl, produce_unit_reports, - testreport_dir, r_pkg_ver_chk, hadoop_namenode, on_hadoop, perf): + testreport_dir, r_pkg_ver_chk, hadoop_namenode, on_hadoop, perf, test_ssl): """ Create a runner. @@ -1015,6 +1024,8 @@ def __init__(self, self.use_ip = use_ip self.use_port = use_port + self.test_ssl = test_ssl + # Valid if use_cloud is False self.num_clouds = num_clouds self.nodes_per_cloud = nodes_per_cloud @@ -1065,7 +1076,7 @@ def __init__(self, else: for i in range(self.num_clouds): cloud = H2OCloud(i, self.use_client, self.nodes_per_cloud, h2o_jar, self.base_port, xmx, cp, - self.output_dir) + self.output_dir, self.test_ssl) self.clouds.append(cloud) @staticmethod @@ -1973,6 +1984,7 @@ def _suspect_cloud(self, ip, port): g_job_name = None g_py3 = False g_pycoverage = False +g_test_ssl = False # globals added to support better reporting in xml files g_use_xml2 = False # by default, use the original xml file output @@ -2105,6 +2117,8 @@ def usage(): print("") print(" --geterrs Generate xml file that contains the actual unit test errors and the actual Java error.") print("") + print(" --test.ssl Runs all the nodes with SSL enabled.") + print("") print(" If neither --test nor --testlist is specified, then the list of tests is") print(" discovered automatically as files matching '*runit*.R'.") print("") @@ -2222,6 +2236,7 @@ def parse_args(argv): global g_py3 global g_pycoverage global g_use_xml2 + global g_test_ssl i = 1 while i < len(argv): @@ -2388,6 +2403,8 @@ def parse_args(argv): g_job_name = argv[i] elif s == "--geterrs": g_use_xml2 = True + elif s == "--test_ssl": + g_test_ssl = True else: unknown_arg(s) @@ -2512,6 +2529,7 @@ def main(argv): global g_ncpu global g_os global g_job_name + global g_test_ssl g_script_name = os.path.basename(argv[0]) @@ -2560,7 +2578,7 @@ def main(argv): g_use_cloud, g_use_cloud2, g_use_client, g_config, g_use_ip, g_use_port, g_num_clouds, g_nodes_per_cloud, h2o_jar, g_base_port, g_jvm_xmx, g_jvm_cp, g_output_dir, g_failed_output_dir, g_path_to_tar, g_path_to_whl, g_produce_unit_reports, - testreport_dir, g_r_pkg_ver_chk, g_hadoop_namenode, g_on_hadoop, g_perf) + testreport_dir, g_r_pkg_ver_chk, g_hadoop_namenode, g_on_hadoop, g_perf, g_test_ssl) # Build test list. if g_exclude_list_file is not None: diff --git a/scripts/run_hadoop_job.py b/scripts/run_hadoop_job.py index 8b4f7039a294..038793616134 100755 --- a/scripts/run_hadoop_job.py +++ b/scripts/run_hadoop_job.py @@ -27,7 +27,7 @@ def is_flow(file_name): class H2OCloud: - def __init__(self, abspath_tempdir, driver, nodes_per_cloud, xmx, output_dir): + def __init__(self, abspath_tempdir, driver, nodes_per_cloud, xmx, output_dir, test_ssl): self.ip = None self.port = None self.job_id = None @@ -36,6 +36,7 @@ def __init__(self, abspath_tempdir, driver, nodes_per_cloud, xmx, output_dir): self.xmx = xmx self.output_dir = output_dir self.notify_file = os.path.join(abspath_tempdir, "notify.txt") + self.test_ssl = test_ssl def start(self): print_barrier() @@ -45,6 +46,10 @@ def start(self): '-output', self.output_dir, '-notify', self.notify_file, '-disown'] + + if self.test_ssl: + cmd.append("-internal_security_conf") + print("+ CMD: " + str(cmd)) returncode = subprocess.call(cmd) @@ -112,6 +117,7 @@ def main(): parser.add_argument("-mapperXmx", "--mapperXmx", help="Size of each H2O node", required=True) parser.add_argument("-script", "--script", help="Name of script to run", required=True) parser.add_argument("-output", "--output", help="HDFS temp output dir", required=True) + parser.add_argument("-test_ssl", "--test_ssl", help="Testing with SSL enabled", required=True) args = parser.parse_args() print_barrier() print ("Path to h2odriver: " + args.driver) @@ -119,9 +125,10 @@ def main(): print ("Size of each node: " + args.mapperXmx) print ("Script to run: " + args.script) print ("HDFS output dir: " + args.output) + print ("SSL on: " + args.test_ssl) abspath_tempdir = tempfile.mkdtemp() - g_runner = H2OCloud(abspath_tempdir, args.driver, args.nodes, args.mapperXmx, args.output) + g_runner = H2OCloud(abspath_tempdir, args.driver, args.nodes, args.mapperXmx, args.output, args.test_ssl) # Handle killing the runner. signal.signal(signal.SIGINT, signal_handler)