diff --git a/.eastwood-overrides b/.eastwood-overrides new file mode 100644 index 0000000..0abbebd --- /dev/null +++ b/.eastwood-overrides @@ -0,0 +1,5 @@ +(disable-warning + {:linter :suspicious-expression + :for-macro 'clojure.core/or + :if-inside-macroexpansion-of #{'clojure.core.async/alt!!} + :within-depth nil}) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a6bdc77 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +/target +/classes +/checkouts +pom.xml +pom.xml.asc +*.jar +*.class +/.lein-* +/.nrepl-port +.hgignore +.hg/ +*.iml +.idea/ +*.DS_Store diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..af62c7b --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,7 @@ +# Protojure Contributors + +- Greg Haskins +- Srinivasan (Murali) Muralidharan +- Matt Rkiouak +- Jon Andrews +- George Lindsell diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f49a4e1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..cf10881 --- /dev/null +++ b/Makefile @@ -0,0 +1,51 @@ +# Copyright © 2019 State Street Bank and Trust Company. All rights reserved +# +# SPDX-License-Identifier: Apache-2.0 + +NAME = protojure +LEIN = $(shell which lein || echo ./lein) +VERSION = $(shell cat project.clj | grep defproject | awk '{ print $$3 }' | sed 's/\"//g') +OUTPUT = target/$(NAME)-$(VERSION).jar +POM = target/pom.xml +DOC = target/doc/index.html + +COVERAGE_THRESHOLD = 87 +COVERAGE_EXCLUSION += "user" +COVERAGE_EXCLUSION += "protojure.internal.pedestal.io" + +DEPS = Makefile project.clj $(shell find src -type f) + +all: scan test bin doc + +scan: + $(LEIN) cljfmt check + $(LEIN) eastwood + +# 'deep-scan' is a target for useful linters that are not conducive to automated checking, +# typically because they present some false positives without an easy mechanism to overrule +# them. So we provide the target to make it easy to run by hand, but leave them out of the +# automated gates. +deep-scan: scan + -$(LEIN) bikeshed + -$(LEIN) kibit + +.PHONY: test +test: + $(LEIN) cloverage --fail-threshold $(COVERAGE_THRESHOLD) $(patsubst %,-e %, $(COVERAGE_EXCLUSION)) + +doc: $(DOC) + +$(DOC): $(DEPS) + $(LEIN) codox + +bin: $(OUTPUT) $(POM) + +$(POM): $(DEPS) + $(LEIN) pom + cp pom.xml $@ + +$(OUTPUT): $(DEPS) + $(LEIN) jar + +clean: + $(LEIN) clean diff --git a/README.md b/README.md new file mode 100644 index 0000000..6201a89 --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# protojure + +Native Clojure support for [Google Protocol Buffer](https://developers.google.com/protocol-buffers/) and [GRPC](https://grpc.io/). + +## Documentation + +You can read the [SDK documentation](https://github.ooa.sttgts.com/pages/dlt/protojure-lib) + +## Usage + +Coming soon! + +## License + +This project is licensed under the Apache License 2.0. + +## Contributing + +We cannot (yet) accept outside contributions to the code base at this time. Please check for updates in the future regarding acceptance of outside contributions. diff --git a/dev-resources/user.clj b/dev-resources/user.clj new file mode 100644 index 0000000..41528b1 --- /dev/null +++ b/dev-resources/user.clj @@ -0,0 +1,6 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns user + (:require [clojure.tools.namespace.repl :refer [refresh]])) diff --git a/lein b/lein new file mode 100755 index 0000000..f110171 --- /dev/null +++ b/lein @@ -0,0 +1,381 @@ +#!/usr/bin/env bash + +# Ensure this file is executable via `chmod a+x lein`, then place it +# somewhere on your $PATH, like ~/bin. The rest of Leiningen will be +# installed upon first run into the ~/.lein/self-installs directory. + +export LEIN_VERSION="2.8.1" + +case $LEIN_VERSION in + *SNAPSHOT) SNAPSHOT="YES" ;; + *) SNAPSHOT="NO" ;; +esac + +if [[ "$CLASSPATH" != "" ]]; then + echo "WARNING: You have \$CLASSPATH set, probably by accident." + echo "It is strongly recommended to unset this before proceeding." +fi + +if [[ "$OSTYPE" == "cygwin" ]] || [[ "$OSTYPE" == "msys" ]]; then + delimiter=";" +else + delimiter=":" +fi + +if [[ "$OSTYPE" == "cygwin" ]]; then + cygwin=true +else + cygwin=false +fi + +function command_not_found { + >&2 echo "Leiningen couldn't find $1 in your \$PATH ($PATH), which is required." + exit 1 +} + +function make_native_path { + # ensure we have native paths + if $cygwin && [[ "$1" == /* ]]; then + echo -n "$(cygpath -wp "$1")" + elif [[ "$OSTYPE" == "msys" && "$1" == /?/* ]]; then + echo -n "$(sh -c "(cd $1 2 /dev/null + download_failed_message "$LEIN_URL" "$exit_code" + exit 1 + fi +} + +NOT_FOUND=1 +ORIGINAL_PWD="$PWD" +while [ ! -r "$PWD/project.clj" ] && [ "$PWD" != "/" ] && [ $NOT_FOUND -ne 0 ] +do + cd .. + if [ "$(dirname "$PWD")" = "/" ]; then + NOT_FOUND=0 + cd "$ORIGINAL_PWD" + fi +done + +export LEIN_HOME="${LEIN_HOME:-"$HOME/.lein"}" + +for f in "/etc/leinrc" "$LEIN_HOME/leinrc" ".leinrc"; do + if [ -e "$f" ]; then + source "$f" + fi +done + +if $cygwin; then + export LEIN_HOME=$(cygpath -w "$LEIN_HOME") +fi + +LEIN_JAR="$LEIN_HOME/self-installs/leiningen-$LEIN_VERSION-standalone.jar" + +# normalize $0 on certain BSDs +if [ "$(dirname "$0")" = "." ]; then + SCRIPT="$(which "$(basename "$0")")" + if [ -z "$SCRIPT" ]; then + SCRIPT="$0" + fi +else + SCRIPT="$0" +fi + +# resolve symlinks to the script itself portably +while [ -h "$SCRIPT" ] ; do + ls=$(ls -ld "$SCRIPT") + link=$(expr "$ls" : '.*-> \(.*\)$') + if expr "$link" : '/.*' > /dev/null; then + SCRIPT="$link" + else + SCRIPT="$(dirname "$SCRIPT"$)/$link" + fi +done + +BIN_DIR="$(dirname "$SCRIPT")" + +export LEIN_JVM_OPTS="${LEIN_JVM_OPTS-"-Xverify:none -XX:+TieredCompilation -XX:TieredStopAtLevel=1"}" + +# This needs to be defined before we call HTTP_CLIENT below +if [ "$HTTP_CLIENT" = "" ]; then + if type -p curl >/dev/null 2>&1; then + if [ "$https_proxy" != "" ]; then + CURL_PROXY="-x $https_proxy" + fi + HTTP_CLIENT="curl $CURL_PROXY -f -L -o" + else + HTTP_CLIENT="wget -O" + fi +fi + + +# When :eval-in :classloader we need more memory +grep -E -q '^\s*:eval-in\s+:classloader\s*$' project.clj 2> /dev/null && \ + export LEIN_JVM_OPTS="$LEIN_JVM_OPTS -Xms64m -Xmx512m" + +if [ -r "$BIN_DIR/../src/leiningen/version.clj" ]; then + # Running from source checkout + LEIN_DIR="$(dirname "$BIN_DIR")" + + # Need to use lein release to bootstrap the leiningen-core library (for aether) + if [ ! -r "$LEIN_DIR/leiningen-core/.lein-bootstrap" ]; then + echo "Leiningen is missing its dependencies." + echo "Please run \"lein bootstrap\" in the leiningen-core/ directory" + echo "with a stable release of Leiningen. See CONTRIBUTING.md for details." + exit 1 + fi + + # If project.clj for lein or leiningen-core changes, we must recalculate + LAST_PROJECT_CHECKSUM=$(cat "$LEIN_DIR/.lein-project-checksum" 2> /dev/null) + PROJECT_CHECKSUM=$(sum "$LEIN_DIR/project.clj" "$LEIN_DIR/leiningen-core/project.clj") + if [ "$PROJECT_CHECKSUM" != "$LAST_PROJECT_CHECKSUM" ]; then + if [ -r "$LEIN_DIR/.lein-classpath" ]; then + rm "$LEIN_DIR/.lein-classpath" + fi + fi + + # Use bin/lein to calculate its own classpath. + if [ ! -r "$LEIN_DIR/.lein-classpath" ] && [ "$1" != "classpath" ]; then + echo "Recalculating Leiningen's classpath." + ORIG_PWD="$PWD" + cd "$LEIN_DIR" + + LEIN_NO_USER_PROFILES=1 $0 classpath .lein-classpath + sum "$LEIN_DIR/project.clj" "$LEIN_DIR/leiningen-core/project.clj" > \ + .lein-project-checksum + cd "$ORIG_PWD" + fi + + mkdir -p "$LEIN_DIR/target/classes" + export LEIN_JVM_OPTS="$LEIN_JVM_OPTS -Dclojure.compile.path=$LEIN_DIR/target/classes" + add_path CLASSPATH "$LEIN_DIR/leiningen-core/src/" "$LEIN_DIR/leiningen-core/resources/" \ + "$LEIN_DIR/test:$LEIN_DIR/target/classes" "$LEIN_DIR/src" ":$LEIN_DIR/resources" + + if [ -r "$LEIN_DIR/.lein-classpath" ]; then + add_path CLASSPATH "$(cat "$LEIN_DIR/.lein-classpath" 2> /dev/null)" + else + add_path CLASSPATH "$(cat "$LEIN_DIR/leiningen-core/.lein-bootstrap" 2> /dev/null)" + fi +else # Not running from a checkout + add_path CLASSPATH "$LEIN_JAR" + + if [ "$LEIN_USE_BOOTCLASSPATH" != "" ]; then + LEIN_JVM_OPTS="-Xbootclasspath/a:$LEIN_JAR $LEIN_JVM_OPTS" + fi + + if [ ! -r "$LEIN_JAR" -a "$1" != "self-install" ]; then + self_install + fi +fi + +if [ ! -x "$JAVA_CMD" ] && ! type -f java >/dev/null +then + >&2 echo "Leiningen couldn't find 'java' executable, which is required." + >&2 echo "Please either set JAVA_CMD or put java (>=1.6) in your \$PATH ($PATH)." + exit 1 +fi + +export LEIN_JAVA_CMD="${LEIN_JAVA_CMD:-${JAVA_CMD:-java}}" + +if [[ -z "${DRIP_INIT+x}" && "$(basename "$LEIN_JAVA_CMD")" == *drip* ]]; then + export DRIP_INIT="$(printf -- '-e\n(require (quote leiningen.repl))')" + export DRIP_INIT_CLASS="clojure.main" +fi + +# Support $JAVA_OPTS for backwards-compatibility. +export JVM_OPTS="${JVM_OPTS:-"$JAVA_OPTS"}" + +# Handle jline issue with cygwin not propagating OSTYPE through java subprocesses: https://github.com/jline/jline2/issues/62 +cygterm=false +if $cygwin; then + case "$TERM" in + rxvt* | xterm* | vt*) cygterm=true ;; + esac +fi + +if $cygterm; then + LEIN_JVM_OPTS="$LEIN_JVM_OPTS -Djline.terminal=jline.UnixTerminal" + stty -icanon min 1 -echo > /dev/null 2>&1 +fi + +# TODO: investigate http://skife.org/java/unix/2011/06/20/really_executable_jars.html +# If you're packaging this for a package manager (.deb, homebrew, etc) +# you need to remove the self-install and upgrade functionality or see lein-pkg. +if [ "$1" = "self-install" ]; then + if [ -r "$BIN_DIR/../src/leiningen/version.clj" ]; then + echo "Running self-install from a checkout is not supported." + echo "See CONTRIBUTING.md for SNAPSHOT-specific build instructions." + exit 1 + fi + echo "Manual self-install is deprecated; it will run automatically when necessary." + self_install +elif [ "$1" = "upgrade" ] || [ "$1" = "downgrade" ]; then + if [ "$LEIN_DIR" != "" ]; then + echo "The upgrade task is not meant to be run from a checkout." + exit 1 + fi + if [ $SNAPSHOT = "YES" ]; then + echo "The upgrade task is only meant for stable releases." + echo "See the \"Bootstrapping\" section of CONTRIBUTING.md." + exit 1 + fi + if [ ! -w "$SCRIPT" ]; then + echo "You do not have permission to upgrade the installation in $SCRIPT" + exit 1 + else + TARGET_VERSION="${2:-stable}" + echo "The script at $SCRIPT will be upgraded to the latest $TARGET_VERSION version." + echo -n "Do you want to continue [Y/n]? " + read RESP + case "$RESP" in + y|Y|"") + echo + echo "Upgrading..." + TARGET="/tmp/lein-${$}-upgrade" + if $cygwin; then + TARGET=$(cygpath -w "$TARGET") + fi + LEIN_SCRIPT_URL="https://github.com/technomancy/leiningen/raw/$TARGET_VERSION/bin/lein" + $HTTP_CLIENT "$TARGET" "$LEIN_SCRIPT_URL" + if [ $? == 0 ]; then + cmp -s "$TARGET" "$SCRIPT" + if [ $? == 0 ]; then + echo "Leiningen is already up-to-date." + fi + mv "$TARGET" "$SCRIPT" && chmod +x "$SCRIPT" + exec "$SCRIPT" version + else + download_failed_message "$LEIN_SCRIPT_URL" + fi;; + *) + echo "Aborted." + exit 1;; + esac + fi +else + if $cygwin; then + # When running on Cygwin, use Windows-style paths for java + ORIGINAL_PWD=$(cygpath -w "$ORIGINAL_PWD") + fi + + # apply context specific CLASSPATH entries + if [ -f .lein-classpath ]; then + add_path CLASSPATH "$(cat .lein-classpath)" + fi + + if [ -n "$DEBUG" ]; then + echo "Leiningen's classpath: $CLASSPATH" + fi + + if [ -r .lein-fast-trampoline ]; then + export LEIN_FAST_TRAMPOLINE='y' + fi + + if [ "$LEIN_FAST_TRAMPOLINE" != "" ] && [ -r project.clj ]; then + INPUTS="$* $(cat project.clj) $LEIN_VERSION $(test -f "$LEIN_HOME/profiles.clj" && cat "$LEIN_HOME/profiles.clj")" + + if command -v shasum >/dev/null 2>&1; then + SUM="shasum" + elif command -v sha1sum >/dev/null 2>&1; then + SUM="sha1sum" + else + command_not_found "sha1sum or shasum" + fi + + export INPUT_CHECKSUM=$(echo "$INPUTS" | $SUM | cut -f 1 -d " ") + # Just don't change :target-path in project.clj, mkay? + TRAMPOLINE_FILE="target/trampolines/$INPUT_CHECKSUM" + else + if hash mktemp 2>/dev/null; then + # Check if mktemp is available before using it + TRAMPOLINE_FILE="$(mktemp /tmp/lein-trampoline-XXXXXXXXXXXXX)" + else + TRAMPOLINE_FILE="/tmp/lein-trampoline-$$" + fi + trap 'rm -f $TRAMPOLINE_FILE' EXIT + fi + + if $cygwin; then + TRAMPOLINE_FILE=$(cygpath -w "$TRAMPOLINE_FILE") + fi + + if [ "$INPUT_CHECKSUM" != "" ] && [ -r "$TRAMPOLINE_FILE" ]; then + if [ -n "$DEBUG" ]; then + echo "Fast trampoline with $TRAMPOLINE_FILE." + fi + exec sh -c "exec $(cat "$TRAMPOLINE_FILE")" + else + export TRAMPOLINE_FILE + "$LEIN_JAVA_CMD" \ + -Dfile.encoding=UTF-8 \ + -Dmaven.wagon.http.ssl.easy=false \ + -Dmaven.wagon.rto=10000 \ + $LEIN_JVM_OPTS \ + -Dleiningen.original.pwd="$ORIGINAL_PWD" \ + -Dleiningen.script="$SCRIPT" \ + -classpath "$CLASSPATH" \ + clojure.main -m leiningen.core.main "$@" + + EXIT_CODE=$? + + if $cygterm ; then + stty icanon echo > /dev/null 2>&1 + fi + + if [ -r "$TRAMPOLINE_FILE" ] && [ "$LEIN_TRAMPOLINE_WARMUP" = "" ]; then + TRAMPOLINE="$(cat "$TRAMPOLINE_FILE")" + if [ "$INPUT_CHECKSUM" = "" ]; then # not using fast trampoline + rm "$TRAMPOLINE_FILE" + fi + if [ "$TRAMPOLINE" = "" ]; then + exit $EXIT_CODE + else + exec sh -c "exec $TRAMPOLINE" + fi + else + exit $EXIT_CODE + fi + fi +fi diff --git a/project.clj b/project.clj new file mode 100644 index 0000000..fc4f8d6 --- /dev/null +++ b/project.clj @@ -0,0 +1,41 @@ +(defproject protojure "0.4.0-alpha12-SNAPSHOT" + :description "Support library for protoc-gen-clojure, providing native Clojure support for Google Protocol Buffers and GRPC applications" + :url "http://github.com/protojure/library" + :license {:name "Apache License 2.0" + :url "https://www.apache.org/licenses/LICENSE-2.0" + :year 2019 + :key "apache-2.0"} + :plugins [[lein-codox "0.10.4"] + [lein-cljfmt "0.5.7"] + [jonase/eastwood "0.2.6"] + [lein-kibit "0.1.6"] + [lein-bikeshed "0.5.1"] + [lein-cloverage "1.0.13"]] + :dependencies [[org.clojure/clojure "1.10.0" :scope "provided"] + [org.clojure/core.async "0.4.490" :scope "provided"] + [com.google.protobuf/protobuf-java "3.7.1" :scope "provided"] + [io.undertow/undertow-core "2.0.20.Final" :scope "provided"] + [io.undertow/undertow-servlet "2.0.20.Final" :scope "provided"] + [org.eclipse.jetty.http2/http2-client "9.4.17.v20190418" :scope "provided"] + [io.pedestal/pedestal.log "0.5.5" :scope "provided"] + [io.pedestal/pedestal.service "0.5.5" :scope "provided"] + [org.clojure/tools.logging "0.4.1"] + [org.apache.commons/commons-compress "1.18"] + [commons-io/commons-io "2.6"] + [funcool/promesa "2.0.1"] + [lambdaisland/uri "1.1.0"]] + :aot [protojure.internal.grpc.codec.io + protojure.internal.pedestal.io] + :codox {:metadata {:doc/format :markdown} + :namespaces [#"^(?!protojure.internal)"]} + :eastwood {:debug [:none] + :exclude-linters [:constant-test] + :add-linters [:unused-namespaces] + :config-files [".eastwood-overrides"] + :exclude-namespaces [example.hello protojure.grpc-test]} + :profiles {:dev {:dependencies [[org.clojure/tools.namespace "0.2.11"] + [clj-http "3.9.1"] + [com.taoensso/timbre "4.10.0"] + [org.clojure/data.codec "0.1.1"]] + :resource-paths ["test/resources"]}}) + diff --git a/src/protojure/grpc/client/api.clj b/src/protojure/grpc/client/api.clj new file mode 100644 index 0000000..3ca2719 --- /dev/null +++ b/src/protojure/grpc/client/api.clj @@ -0,0 +1,68 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.grpc.client.api + "Provider independent client API for invoking GRPC requests") + +(defprotocol Provider + ;;;;---------------------------------------------------------------------------------------------------- + (invoke [this params] + " +Invokes a GRPC-based remote-procedure against the provider + +#### Parameters +A map with the following entries: + +| Value | Type | Description | +|-----------------|-------------------|---------------------------------------------------------------------------| +| **service** | _String_ | The GRPC service-name of the endpoint | +| **method** | _String_ | The GRPC method-name of the endpoint | +| **metadata** | _map_ | Optional [string string] tuples that will be submitted as attributes to the request, such as via HTTP headers for GRPC-HTTP2 | +| **input** | _map_ | See _Input_ section below | +| **output** | _map_ | See _Output_ section below | + +##### Unary vs Streaming Input + +Any [GRPC Service endpoint](https://grpc.io/docs/guides/concepts.html#service-definition) can define methods that take either unary or streaming inputs or outputs. +This API assumes core.async channels in either case as a 'streaming first' design. For unary input, simply produce one message before closing the stream. Closing the stream indicates that the input is complete. + +##### Input +The _input_ parameter is a map with the following fields: + +| Value | Type | Description | +|-----------------|-------------------|---------------------------------------------------------------------------| +| **f** | _(fn [map])_ | A protobuf new-XX function, such as produced by the protoc-gen-clojure compiler, to be applied to any outbound request messages | +| **ch** | _core.async/chan_ | A core.async channel used to send input parameters. Close to complete. | + +##### Output +The _output_ parameter is a map with the following fields: + +| Value | Type | Description | +|-----------------|-------------------|---------------------------------------------------------------------------| +| **f** | _(fn [is])_ | A protobuf pb->msg function, such as produced by the protoc-gen-clojure compiler, to be applied to any incoming response messages | +| **ch** | _core.async/chan_ | A core.async channel that will be populated with any GRPC return messages. Unary responses arrive as a single message on the channel. Will close when the response is complete. | + +#### Return value +A promise that, on success, evaluates to a map with the following entries: + +| Value | Type | Description | +|-----------------|----------|---------------------------------------------------------------------------| +| **status** | _Int_ | The [GRPC status code](https://github.com/grpc/grpc/blob/master/doc/statuscodes.md) returned from the remote procedure | +| **message** | _String_ | The GRPC message (if any) returned from the remote procedure | + +#### Example + +``` +(let [{:keys [status message]} @(invoke client {:service \"my.service\" + :method \"MyMethod\" + :input {:f myservice/new-MyRequest :ch input} + :output {:f myservice/pb->MyResponse :ch output}})] + (println \"status:\" status)) +``` + +") + + ;;;;---------------------------------------------------------------------------------------------------- + (disconnect [this] + "Disconnects from the GRPC endpoint and releases all resources held by the underlying provider")) \ No newline at end of file diff --git a/src/protojure/grpc/client/providers/http2.clj b/src/protojure/grpc/client/providers/http2.clj new file mode 100644 index 0000000..dbaa462 --- /dev/null +++ b/src/protojure/grpc/client/providers/http2.clj @@ -0,0 +1,36 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.grpc.client.providers.http2 + "Implements the [GRPC-HTTP2](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md) protocol for clients" + (:require [protojure.internal.grpc.client.providers.http2.core :as core] + [protojure.internal.grpc.client.providers.http2.jetty :as jetty] + [protojure.grpc.codec.compression :refer [builtin-codecs]] + [promesa.core :as p] + [lambdaisland.uri :as lambdaisland] + [clojure.tools.logging :as log])) + +(defn connect + " +Connects the client to a [GRPC-HTTP2](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md) compatible server + +#### Parameters +A map with the following entries: + +| Value | Type | Default | Description | +|--------------------|----------|-------------------------------------------------------------------------------------| +| **uri** | _String_ | n/a | The URI of the GRPC server | +| **codecs** | _map_ | [[protojure.grpc.codec.core/builtin-codecs]] | Optional custom codecs | +| **content-coding** | _String_ | nil | The encoding to use on request data | +| **max-frame-size** | _UInt32_ | 16384 | The maximum HTTP2 DATA frame size | + +#### Return value +A promise that, on success, evaluates to an instance of [[api/Provider]]. +_(api/disconnect)_ should be used to release any resources when the connection is no longer required. + " + [{:keys [uri codecs content-coding max-frame-size] :or {codecs builtin-codecs max-frame-size 16384} :as params}] + (log/debug "Connecting with GRPC-HTTP2:" params) + (let [{:keys [host port]} (lambdaisland/uri uri)] + (-> (jetty/connect {:host host :port (Integer/parseInt port)}) + (p/then #(core/->Http2Provider % uri codecs content-coding max-frame-size))))) diff --git a/src/protojure/grpc/client/utils.clj b/src/protojure/grpc/client/utils.clj new file mode 100644 index 0000000..65b344a --- /dev/null +++ b/src/protojure/grpc/client/utils.clj @@ -0,0 +1,52 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.grpc.client.utils + "Functions used for grpc unary calls in clients generated by protojure-protoc-plugin" + (:require [promesa.core :as p] + [protojure.grpc.client.api :as grpc] + [clojure.core.async :as async])) + +(defn- take [ch] + (p/promise + (fn [resolve reject] + (async/take! ch resolve)))) + +(defn- put [ch val] + (p/promise + (fn [resolve reject] + (async/put! ch val resolve)))) + +(defn send-unary-params + " +Places an item on a channel and then closes the channel, returning a promise that completes +after the channel is closed. Used in remote procedure calls with unary parameters. + +#### Parameters + +| Value | Type | Description | +|-------------|----------------------|----------------------------------------------------------------------------| +| **ch** | _core.async/channel_ | A core.async channel expected to carry 'params' and be subsequently closed | +| **params** | _any_ | The object to place on the channel | + " + [ch params] + (-> (put ch params) + (p/then (fn [_] (async/close! ch))))) + +(defn invoke-unary + " +Invokes a GRPC operation similar to the invoke operation within [[api/Provider]], but the promise returned +resolves to a decoded result when successful. Used in remote procedure calls with unary return types. + +#### Parameters + +| Value | Type | Description | +|-------------|----------------------|----------------------------------------------------------------------------| +| **client** | _[[api/Provider]]_ | An instance of a client provider | +| **params** | _map_ | See 'params' in the '(invoke ..)' method within [[api/Provider]] | +| **ch** | _core.async/channel_ | A core.async channel expected to carry the response data | + " + [client params ch] + (-> (grpc/invoke client params) + (p/then (fn [_] (take ch))))) \ No newline at end of file diff --git a/src/protojure/grpc/codec/compression.clj b/src/protojure/grpc/codec/compression.clj new file mode 100644 index 0000000..f00bbd6 --- /dev/null +++ b/src/protojure/grpc/codec/compression.clj @@ -0,0 +1,85 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.grpc.codec.compression + (:import (org.apache.commons.compress.compressors.gzip GzipCompressorInputStream + GzipCompressorOutputStream + GzipParameters) + (org.apache.commons.compress.compressors.snappy FramedSnappyCompressorInputStream + FramedSnappyCompressorOutputStream) + (org.apache.commons.compress.compressors.deflate DeflateCompressorInputStream + DeflateCompressorOutputStream))) + +;;-------------------------------------------------------------------------------------- +;; compression support +;;-------------------------------------------------------------------------------------- +(def ^:no-doc _builtin-codecs + [{:name "gzip" + :input #(GzipCompressorInputStream. %) + :output #(let [params (GzipParameters.)] (.setCompressionLevel params 9) (GzipCompressorOutputStream. % params))} + + {:name "snappy" + :input #(FramedSnappyCompressorInputStream. %) + :output #(FramedSnappyCompressorOutputStream. %)} + + {:name "deflate" + :input #(DeflateCompressorInputStream. %) + :output #(DeflateCompressorOutputStream. %)}]) + +(def builtin-codecs + " +A map of built-in compression [codecs](https://en.wikipedia.org/wiki/Codec), keyed by name. + +| Name | Description | +|--------------|--------------------------------------------------------| +| \"gzip\" | [gzip](https://en.wikipedia.org/wiki/Gzip) codec | +| \"deflate\" | [deflate](https://en.wikipedia.org/wiki/DEFLATE) codec | +| \"snappy\" | [snappy](https://github.com/google/snappy) codec | + +These built-in codecs are used by default, unless the caller overrides the codec dictionary. A common use +case would be to augment the built-in codecs with 1 or more custom codecs. + +#### Custom codecs + +##### Map specification +The codec map consists of a collection of name/value pairs of codec-specifications keyed by a string representing +the name of the codec. + +``` +[\"mycodec\" {:input inputfn :output outputfn}] +``` + +where + +- **inputfn**: a (fn) that accepts an InputStream input, and returns an InputStream +- **outputfn**: a (fn) that accepts an OutputStream input, and returns an OutputStream + +##### Example + +``` +(assoc builtin-codecs + \"mycodec\" {:input clojure.core/identity :output clojure.core/identity}) +``` + +**N.B.**: The output stream returned in _outputfn_ will have its (.close) method invoked to finalize +compression. Therefore, the use of `identity` above would be problematic in the real-world since we +may not wish to actually close the underlying stream at that time. Therefore, its use above is only for +simplistic demonstration. A functional \"pass through\" example could be built using something like +[CloseShieldOutputStream](https://commons.apache.org/proper/commons-io/javadocs/api-2.4/org/apache/commons/io/output/CloseShieldOutputStream.html) + " + (->> _builtin-codecs (map #(vector (:name %) %)) (into {}))) + +(defn- get-codec-by-polarity [factory polarity] + (if-let [codec (get factory polarity)] + codec + (throw (ex-info "CODEC polarity not found" {:codec factory :polarity polarity})))) + +(defn- get-codec [codecs type polarity] + (if-let [factory (get codecs type)] + (get-codec-by-polarity factory polarity) + (throw (ex-info "Unknown CODEC name" {:name type :polarity polarity})))) + +(defn ^:no-doc compressor [codecs type] (get-codec codecs type :output)) +(defn ^:no-doc decompressor [codecs type] (get-codec codecs type :input)) + diff --git a/src/protojure/grpc/codec/lpm.clj b/src/protojure/grpc/codec/lpm.clj new file mode 100644 index 0000000..3c6c534 --- /dev/null +++ b/src/protojure/grpc/codec/lpm.clj @@ -0,0 +1,248 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.grpc.codec.lpm + "Utility functions for GRPC [length-prefixed-message](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests) encoding." + (:require [clojure.core.async :refer [! go go-loop] :as async] + [promesa.core :as p] + [protojure.protobuf :as pb] + [protojure.grpc.codec.compression :as compression] + [clojure.tools.logging :as log]) + (:import (protojure.internal.grpc.io InputStream + OutputStream) + (java.io ByteArrayOutputStream) + (org.apache.commons.io.input BoundedInputStream)) + (:refer-clojure :exclude [resolve])) + +;;-------------------------------------------------------------------------------------- +;; integer serdes used for GRPC framing +;;-------------------------------------------------------------------------------------- +(defn- bytes->num + "Deserializes an integer from a byte-array. + + Shamelessly borrowed from https://gist.github.com/pingles/1235344" + [data] + (->> data + (map-indexed + (fn [i x] + (bit-shift-left (bit-and x 0x0FF) + (* 8 (- (count data) i 1))))) + (reduce bit-or))) + +(defn- num->bytes + "Serializes an integer to a byte-array." + [num] + (byte-array (for [i (range 4)] + (-> (unsigned-bit-shift-right num + (* 8 (- 4 i 1))) + (bit-and 0x0FF))))) + +;;====================================================================================== +;; GRPC length-prefixed-message (LPM) codec +;;====================================================================================== +;; +;; GRPC encodes protobuf messages as: +;; +;; [ +;; 1b : compressed? (0 = no, 1 = yes) +;; 4b : length of message +;; Nb : N = length bytes of optionally compressed protobuf +;; +;; ] +;; +;; Reference: +; https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests +;;====================================================================================== + +;;-------------------------------------------------------------------------------------- +;; Decoder +;;-------------------------------------------------------------------------------------- +(defn- decoder-stream [is compressed? decompressor] + (if (and compressed? (some? decompressor)) + (decompressor is) + is)) + +(defn- decode-header + "Decodes 5-bytes into a {compressed? len} tuple" + [hdr] + (let [compressed? (first hdr) + lenbuf (rest hdr)] + {:compressed? (pos? compressed?) :len (bytes->num lenbuf)})) + +(defn- decode-body + "Decodes a LPM payload based on a previously decoded header (see [[decode-header]])" + [f is {:keys [compressed? len] :as header} options] + (-> (BoundedInputStream. is len) + (decoder-stream compressed? options) + f)) + +(defn- _decode [f is buf data decompressor] + (let [buf (conj buf data)] + (if (= (count buf) 5) + (let [hdr (decode-header buf)] + [[] (decode-body f is hdr decompressor)]) + [buf nil]))) + +;;-------------------------------------------------------------------------------------------- + +(defn decode + " +Takes a parsing function, a pair of input/output [core.async channels](https://clojuredocs.org/clojure.core.async/chan), and an optional codec and +decodes a stream of [length-prefixed-message](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests) (LPM). + +#### Parameters + +| Value | Type | Description | +|-----------------|----------------------|---------------------------------------------------------------------------| +| **f** | _(fn [is])_ | A protobuf decoder function with an arity of 1 that accepts an instance of [InputStream](https://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html), such as the pb-> functions produced by the protoc-gen-clojure compiler | +| **input** | _core.async/channel_ | A core.async channel that carries LPM encoded bytes, one byte per [(take!)](https://clojuredocs.org/clojure.core.async/take!). Closing the input will shut down the pipeline. | +| **output** | _core.async/channel_ | A core.async channel that carries decoded protobuf records. Will be closed when the pipeline shuts down. | +| **options** | _map_ | See _Options_ + +##### Options + +| Value | Type | Default | Description | +|--------------------|-----------------------------------------------------------------| +| **content-coding** | _string_ | \"identity\" | See _Content-coding_ table | +| **codecs** | _map_ | [[protojure.grpc.codec.compression/builtin-codecs]] | The dictionary of codecs to utilize | +| **tmo** | _unsigned int_ | 5000ms | A timeout, in ms, for receiving the remaining LPM payload bytes once the header has been received. | + +###### Example + +``` +{:content-coding \"gzip\" + :codecs mycodecs + :tmo 1000} +``` + +##### Content-coding +The value for the **content-coding** option must be one of + +| Value | Comment | +|----------------|-------------------------------------------| +| nil | no compression | +| \"identity\" | no compression | +| _other string_ | retrieves the codec from _codecs_ option | + + +" + [f input output {:keys [codecs content-coding tmo] :or {codecs compression/builtin-codecs tmo 5000} :as options}] + (let [is (InputStream. {:ch input :tmo tmo}) + decompressor (when (and (some? content-coding) (not= content-coding "identity")) + (compression/decompressor codecs content-coding))] + (p/promise + (fn [resolve reject] + (go + (try + (loop [acc []] + (if-let [data (! output msg)) + (recur acc)) + (resolve :ok))) + (catch Exception e + (reject e)) + (finally + (async/close! output)))))))) + +;;-------------------------------------------------------------------------------------- +;; Encoder +;;-------------------------------------------------------------------------------------- +(defn- encode-header [os compressed? len] + (.write os (int (if compressed? 1 0))) + (.write os (num->bytes len))) + +(defn- encode-uncompressed + ([msg os] + (encode-uncompressed msg (pb/length msg) os)) + ([msg len os] + (encode-header os false len) + (pb/->pb msg os))) + +(defn- encode-compressed-buffer [buf len os] + (encode-header os true len) + (.write os buf)) + +(defn- compress-msg [compressor msg] + (let [os (ByteArrayOutputStream.) + cos (compressor os)] + (pb/->pb msg cos) + (.close cos) + (.toByteArray os))) + +(defn- encode-maybe-compressed + "This function will encode the message either with or without compression, + depending on whichever results in the smaller message" + [msg compressor os] + (let [buf (compress-msg compressor msg) + clen (count buf) + len (pb/length msg)] + (if (< clen len) + (encode-compressed-buffer buf clen os) + (encode-uncompressed msg len os)))) + +;;-------------------------------------------------------------------------------------------- +(defn encode + " +Takes an input and output [core.async channel](https://clojuredocs.org/clojure.core.async/chan), along with an optional codec and +encodes a stream of 0 or more [length-prefixed-message](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests) +messages. + +#### Parameters + +| Value | Type | Description | +|-----------------|----------------------|---------------------------------------------------------------------------| +| **f** | _(fn [init])_ | A protobuf encoder function with an arity of 1 that accepts an init-map, such as the new-XX functions produced by the protoc-gen-clojure compiler | +| **input** | _core.async/channel_ | A core.async channel expected to carry maps that will be transformed to protobuf messages based on _f_ | +| **output** | _core.async/channel_ | A core.async channel that will carry bytes representing the encoded messages. See _max-frame-size_. Will be closed when the pipeline shuts down. | +| **options** | _map_ | See _Options_ | + +##### Options + +| Value | Type | Default | Description | +|--------------------|---------------------------------------------------------------------------------------------------------------| +| **content-coding** | _String_ | \"identity\" | See the _content-coding_ section of [[decode]] | +| **codecs** | _map_ | [[protojure.grpc.codec.compression/builtin-codecs]] | The dictionary of codecs to utilize | +| **max-frame-size** | _UInt32_ | 0 | Maximum frame-size emitted on _output_ channel. See _Output channel details_ below | + +##### Output channel details + +The _max-frame-size_ option dictates how bytes are encoded on the _output_ channel: + + - **0**: (Default) Indicates 'stream' mode: Bytes are encoded one byte per [(put!)](https://clojuredocs.org/clojure.core.async/put!). + - **>0**: Indicates 'framed' mode: The specified value will dictate the upper bound on byte-array based frames emitted to the output channel. + +###### Example + +``` +{:content-coding \"gzip\" + :codecs mycodecs + :max-frame-size 16384} +``` + " + [f input output {:keys [codecs content-coding max-frame-size] :or {codecs compression/builtin-codecs} :as options}] + (let [os (OutputStream. {:ch output :max-frame-size max-frame-size}) + compressor (when (and (some? content-coding) (not= content-coding "identity")) + (compression/compressor codecs content-coding))] + (p/promise + (fn [resolve reject] + (go + (try + (loop [] + (if-let [_msg (!! ! go go-loop onto-chan] :as async] + [clojure.tools.logging :as log] + [protojure.grpc.client.api :as api] + [protojure.grpc.codec.lpm :as lpm] + [protojure.internal.grpc.client.providers.http2.jetty :as jetty] + [promesa.core :as p]) + (:refer-clojure :exclude [resolve])) + +(defn- input-pipeline + "'inputs' to the GRPC function, e.g. parameters, are LPM encoded in the request-body" + [{:keys [f] :as input} codecs content-coding max-frame-size] + (when (some? input) + (let [input-ch (:ch input) + output-ch (async/chan 16)] + (lpm/encode f input-ch output-ch {:codecs codecs :content-coding content-coding :max-frame-size max-frame-size}) + output-ch))) + +(defn- codecs-to-accept [codecs] + (clojure.string/join "," (cons "identity" (keys codecs)))) + +(defn- send-request + "Sends an HTTP2 based POST request that adheres to the GRPC-HTTP2 specification" + [context uri codecs content-coding {:keys [metadata service method options] :as params} input-ch meta-ch output-ch] + (log/trace (str "Invoking GRPC \"" service "/" method "\"")) + (let [hdrs (-> {"content-type" "application/grpc+proto" + "grpc-encoding" (or content-coding "identity") + "grpc-accept-encoding" (codecs-to-accept codecs)} + (merge metadata)) + url (str uri "/" service "/" method)] + (jetty/send-request context {:method "POST" + :url url + :headers hdrs + :input-ch input-ch + :meta-ch meta-ch + :output-ch output-ch}))) + +(defn- receive-headers + "Listen on the metadata channel _until_ we receive a status code. We are interested in both + ensuring the call was successful (e.g. :status == 200) and we want to know what :content-coding + may be applied to any response-body LPM protobufs. Therefore, we must gate any further + processing until we have received the \"headers\", and we assume we have fully received them + once we see the :status tag. We also note that the metadata channel is not expected to close + before :status has been received, and nor do we expect it to close even after we've received + :status since we will presumably be receiving trailers in the future. Therefore, we treat + core.async channel closure as an error, and terminate the processing once the response contains + the :status code. + " + [meta-ch request] + (p/promise + (fn [resolve reject] + (go-loop [response {}] + (if-let [data ( status (= 200)) + (-> (p/all [(receive-body codecs data-ch output response) + (receive-trailers meta-ch response)]) + (p/then (fn [[_ {{:strs [grpc-status grpc-message]} :trailers :as response}]] ;; [body-response trailers-response] + (let [grpc-status (decode-grpc-status grpc-status)] + (if (zero? grpc-status) + (-> {:status grpc-status} + (cond-> (some? grpc-message) (assoc :message grpc-message))) + (p/rejected (ex-info "bad grpc-status response" {:status grpc-status :message grpc-message :meta {:response response}}))))))) + (p/rejected (ex-info "bad status response" {:response response})))) + +;;----------------------------------------------------------------------------- +;;----------------------------------------------------------------------------- +;; External API +;;----------------------------------------------------------------------------- +;;----------------------------------------------------------------------------- + +;;----------------------------------------------------------------------------- +;; Provider +;;----------------------------------------------------------------------------- +(deftype Http2Provider [context uri codecs content-coding max-frame-size] + api/Provider + + (invoke [_ {:keys [input output] :as params}] + (let [input-ch (input-pipeline input codecs content-coding max-frame-size) + meta-ch (async/chan 32) + output-ch (when (some? output) (async/chan 16384))] + (-> (send-request context uri codecs content-coding params input-ch meta-ch output-ch) + (p/then (partial receive-headers meta-ch)) + (p/then (partial receive-payload codecs meta-ch output-ch output)) + (p/then (fn [status] + (log/trace "GRPC completed:" status) + status)) + (p/catch (fn [ex] + (log/error "GRPC failed:" ex) + (throw ex)))))) + + (disconnect [_] + (jetty/disconnect context))) diff --git a/src/protojure/internal/grpc/client/providers/http2/jetty.clj b/src/protojure/internal/grpc/client/providers/http2/jetty.clj new file mode 100644 index 0000000..24bf111 --- /dev/null +++ b/src/protojure/internal/grpc/client/providers/http2/jetty.clj @@ -0,0 +1,190 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.internal.grpc.client.providers.http2.jetty + (:require [promesa.core :as p] + [clojure.core.async :refer [>!! ! go go-loop] :as async] + [clojure.tools.logging :as log]) + (:import (java.net InetSocketAddress) + (java.nio ByteBuffer) + (org.eclipse.jetty.http2.client HTTP2Client) + (org.eclipse.jetty.http2.api Stream$Listener) + (org.eclipse.jetty.http2.api.server ServerSessionListener$Adapter) + (org.eclipse.jetty.http2.frames HeadersFrame + DataFrame) + (org.eclipse.jetty.util Promise Callback) + (org.eclipse.jetty.http HttpFields + HttpURI + HttpVersion + MetaData$Request)) + (:refer-clojure :exclude [resolve])) + +;;------------------------------------------------------------------------------------ +;; Utility functions +;;------------------------------------------------------------------------------------ + +(defn- jetty-promise + "converts a jetty promise to promesa" + [f] + (p/promise + (fn [resolve reject] + (let [p (reify Promise + (succeeded [_ result] + (resolve result)) + (failed [_ error] + (reject error)))] + (f p))))) + +(defn- jetty-callback-promise + "converts a jetty 'callback' to promesa" + [f] + (p/promise + (fn [resolve reject] + (let [cb (reify Callback + (succeeded [_] + (resolve true)) + (failed [_ error] + (reject error)))] + (f cb))))) + +(defn- ->fields + "converts a map of [string string] name/value attributes to a jetty HttpFields container" + [headers] + (let [fields (new HttpFields)] + (run! (fn [[k v]] (.put fields ^String k ^String v)) headers) + fields)) + +(defn- fields-> + "converts jetty HttpFields container to a [string string] map" + [fields] + (->> (.iterator fields) + (iterator-seq) + (reduce (fn [acc x] + (assoc acc (.getName x) (.getValue x))) {}))) + +(defn- build-request + "Builds a HEADERFRAME representing our request" + [{:keys [method headers url] :or {method "GET" headers {}} :as request} last?] + (log/trace "Sending request:" request "ENDFRAME=" last?) + (let [_uri (HttpURI. ^String url)] + (as-> (->fields headers) $ + (MetaData$Request. method _uri HttpVersion/HTTP_2 $) + (HeadersFrame. $ nil last?)))) + +(defn- close-all! [& channels] + (run! (fn [ch] (when (some? ch) (async/close! ch))) channels)) + +(defn- stream-log [sev stream & msg] + (log/log sev (apply str (cons (str "STREAM " (.getId stream) ": ") msg)))) + +(defn- receive-listener + "Implements a org.eclipse.jetty.http2.api.Stream.Listener set of callbacks" + [meta-ch data-ch] + (let [end-stream! (fn [stream] (stream-log :trace stream "Closing") (close-all! meta-ch data-ch))] + (reify Stream$Listener + (onHeaders [_ stream frame] + (let [metadata (.getMetaData frame) + fields (fields-> (.getFields metadata)) + data (if (.isResponse metadata) + (let [status (.getStatus metadata) + reason (.getReason metadata)] + (-> {:headers fields} + (cond-> (some? status) (assoc :status status)) + (cond-> (some? reason) (assoc :reason reason)))) + {:trailers fields}) + last? (.isEndStream frame)] + (stream-log :trace stream "Received HEADER-FRAME: " data " ENDFRAME=" last?) + (>!! meta-ch data) + (when last? + (end-stream! stream)))) + (onData [_ stream frame callback] + (let [data (.getData frame) + len (.remaining data) + last? (.isEndStream frame)] + (stream-log :trace stream "Received DATA-FRAME (" len " bytes) ENDFRAME=" last?) + (when (some? data-ch) + (doseq [b (repeatedly len #(.get data))] + (async/put! data-ch (bit-and 0xff b)))) ;; FIXME: cast to byte? + (when last? + (end-stream! stream))) + (.succeeded callback)) + (onFailure [_ stream error reason callback] + (stream-log :error stream "FAILURE: " error) + (>!! meta-ch {:error {:type :failure :code error :reason reason}}) + (end-stream! stream) + (.succeeded callback)) + (onReset [_ stream frame] + (stream-log :error stream "Received RST-FRAME") + (let [error (.getError frame)] + (>!! meta-ch {:error {:type :reset :code error}}) + (end-stream! stream))) + (onIdleTimeout [_ stream ex] + (stream-log :error stream "Timeout") + (>!! meta-ch {:error {:type :timeout :error ex}}) + (end-stream! stream)) + (onClosed [_ stream] + (stream-log :trace stream "Closed")) + (onPush [_ stream frame] + (stream-log :trace stream "Received PUSH-FRAME"))))) + +(defn- transmit-data-frame + "Transmits a single DATA frame" + ([stream data] + (transmit-data-frame stream data false 0)) + ([stream data last? padding] + (stream-log :trace stream "Sending DATA-FRAME with " (count data) " bytes, ENDFRAME=" last?) + @(jetty-callback-promise + (fn [cb] + (let [frame (DataFrame. (.getId stream) (ByteBuffer/wrap data) last? padding)] + (.data stream frame cb)))))) + +(defn- transmit-eof + "Transmits an empty DATA frame with the ENDSTREAM flag set to true, signifying the end of stream" + [stream] + (transmit-data-frame stream (byte-array 0) true 0)) + +(defn- transmit-data-frames + "Creates DATA frames from the buffers on the channel" + [input stream] + (when (some? input) + (go-loop [] + (if-let [frame ( (jetty-promise + (fn [p] + (.connect client nil address listener p))) + (p/then (fn [session] + (let [context {:client client :session session}] + (log/debug "Session established:" context) + context)))))) + +(defn send-request + [{:keys [session] :as context} + {:keys [input-ch meta-ch output-ch] :as request}] + (let [request-frame (build-request request (nil? input-ch)) + listener (receive-listener meta-ch output-ch)] + (-> (jetty-promise + (fn [p] + (.newStream session request-frame p listener))) + (p/then (partial transmit-data-frames input-ch)) + (p/catch (fn [ex] (close-all! meta-ch output-ch) (throw ex)))))) + +(defn disconnect [{:keys [client] :as context}] + (log/debug "Disconnecting:" context) + (.stop client) + (dissoc context :client :session)) diff --git a/src/protojure/internal/grpc/codec/io.clj b/src/protojure/internal/grpc/codec/io.clj new file mode 100644 index 0000000..382cf84 --- /dev/null +++ b/src/protojure/internal/grpc/codec/io.clj @@ -0,0 +1,130 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.internal.grpc.codec.io + (:require [clojure.core.async :refer [! > (take-available ch) + (take (dec max-frame-size)) ;; -1 to account for first-byte + (cons first-byte) + (byte-array))) + +(defn- bytes-to-frames + "Creates byte-array frames from the stream of bytes available on the input channel" + [input output max-frame-size] + (go + (try + (loop [] + (when-let [data (! output (build-data-frame input data max-frame-size))) + (recur))) + (catch Exception e + (log/error e)) + (finally + (async/close! output))))) + +(defn- os-init-framed [{:keys [max-frame-size] :as options}] + (let [bytes-ch (async/chan max-frame-size) + frames-ch (:ch options)] + (bytes-to-frames bytes-ch frames-ch max-frame-size) + {:ch bytes-ch :framed? true})) + +(defn- os-init [{:keys [ch max-frame-size] :as options}] + (if (and (some? max-frame-size) (pos? max-frame-size)) + [[] (os-init-framed options)] + [[] {:ch ch :framed? false}])) + +(defn- os-flush + "Sends a ':flush' signal to our framer when used in 'framed?=true' mode, NOP for streaming mode. + N.B. The flush may or may not be result in an immediate write to the underlying sink since + the framing layer may try to coalesce writes at its own discretion" + [this] + (let [{:keys [ch framed?]} (.state this)] + (when framed? + (async/put! ch :flush)))) + +(defn- os-close + [this] + (let [{:keys [ch]} (.state this)] + (async/close! ch))) + +(defn- os-write-int + [this b] + (let [{:keys [ch]} (.state this)] + (async/put! ch (bit-and b 0xFF)))) diff --git a/src/protojure/internal/pedestal/io.clj b/src/protojure/internal/pedestal/io.clj new file mode 100644 index 0000000..2422f98 --- /dev/null +++ b/src/protojure/internal/pedestal/io.clj @@ -0,0 +1,34 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.internal.pedestal.io + (:import (protojure.internal.grpc.io InputStream))) + +(gen-class + :name protojure.pedestal.io.InputStream + :extends javax.servlet.ServletInputStream + :state state + :init init + :constructors {[Object] []} + :exposes-methods {read parentRead}) + +(defn- -init [channel] + [[] {:is (InputStream. {:ch channel})}]) + +(defn- -available + [this] + (let [{:keys [is]} (.state this)] + (.available is))) + +(defn- -read + "Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255. + See InputStream for further details." + ([this bytes offset len] + (.parentRead this bytes offset len)) + ([this bytes] + (.parentRead this bytes)) + ([this] + (let [{:keys [is]} (.state this)] + (.read is)))) + diff --git a/src/protojure/pedestal/core.clj b/src/protojure/pedestal/core.clj new file mode 100644 index 0000000..cc7dcb3 --- /dev/null +++ b/src/protojure/pedestal/core.clj @@ -0,0 +1,283 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.pedestal.core + "A [Pedestal](http://pedestal.io/) [chain provider](http://pedestal.io/reference/chain-providers) compatible with simultaneously serving both [GRPC-HTTP2](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md) and [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocols" + (:require [io.pedestal.http :as http] + [io.pedestal.interceptor.chain :as pedestal.chain] + [io.pedestal.interceptor.helpers :as pedestal.interceptors] + [io.pedestal.log :as log] + [clojure.string :as string] + [clojure.pprint :refer [pprint]] + [clojure.core.async :refer [go-loop !! >! close! put! timeout poll!]] + [promesa.core :as p] + [clojure.java.io :as io] + [protojure.pedestal.ssl :as ssl]) + (:import (io.undertow.server HttpHandler + HttpServerExchange) + (io.undertow Undertow + UndertowOptions) + (io.undertow.server.protocol.http HttpAttachments) + (io.undertow.util HeaderMap + HttpString) + (java.io InputStream) + (io.undertow.io Receiver$PartialBytesCallback Receiver$ErrorCallback) + (java.nio ByteBuffer)) + (:refer-clojure :exclude [resolve])) + +(defn- assoc-header + "Associates an undertow header entry as a tuple in a map" + [headers map key] + (assoc map (string/lower-case key) (.get headers key 0))) + +(defn- get-request-headers + "Create a map of request header elements" + [exchange] + (let [headers (.getRequestHeaders exchange)] + (reduce (partial assoc-header headers) {} (.getHeaderNames headers)))) + +(defn- write-trailers + "Places the tuples representing trailers to undertow with a putAttachment operation" + [exchange trailers] + (let [data (HeaderMap.)] + (doseq [[k v] trailers] + (.add data (HttpString. (name k)) v)) + (.putAttachment exchange HttpAttachments/RESPONSE_TRAILERS data))) + +(defn- read-stream-chunk + "Chunks reads from an input-stream, returning either a byte-array when data is found, or nil for EOF" + [stream requested-size] + (let [buf (byte-array requested-size) + actual-size (.read stream buf)] + (cond + (= actual-size -1) nil ;; EOF + (< actual-size requested-size) (byte-array actual-size buf) ;; resize if smaller + :else buf))) + +(defn- byte-chunk-seq + "Returns a lazy sequence of byte-array chunks of length 'size' from an input stream" + [stream size] + (lazy-seq (when-let [buf (read-stream-chunk stream size)] + (cons buf (byte-chunk-seq stream size))))) + +(defn- async-poll-seq + "Returns a lazy sequence of (non-blocking) items available on a core.async channel" + [ch] + (lazy-seq (when-some [data (poll! ch)] + (cons data (async-poll-seq ch))))) + +(defn- write-data + "Writes the provided bytes to the undertow response channel" + [ch data] + (.write ch (ByteBuffer/wrap data))) + +(defn- flush + [ch] + (while (not (.flush ch)))) + +(defn- write-data-coll + "Writes and flushes an entire collection" + [ch coll] + (run! (partial write-data ch) coll) + (flush ch)) + +(defn- write-direct-data + "Used for trivial response bodies, such as String or byte types" + [ch data] + (p/resolved + (do + (write-data ch data) + (flush ch)))) + +(defn- write-streaming-data + "Used for InputStream type response bodies. Will chunk the data to avoid + overburdening the heap" + [ch is] + (p/resolved (write-data-coll ch (byte-chunk-seq is 65536)))) + +(defn- write-available-async-data + "Drains all remaining data from a core.async channel and flushes it to the response channel" + [output-ch input-ch] + (write-data-coll output-ch (async-poll-seq input-ch))) + +(defn- write-async-data + "Used for core.async type response bodies. Each message received is assumed + to represent a data frame and thus will be flushed" + [output-ch input-ch] + (p/promise + (fn [resolve reject] + (write-available-async-data output-ch input-ch) + (go + (try + (loop [] + (if-let [data (!! ch (bit-and 0xff b))) + (when last + (close! ch) + (resolve true)) + (.resume receiver))) + (reify Receiver$ErrorCallback + (error [this exchange e] + (close! ch) + (reject e)))))))) + +(defmulti ^:no-doc transmit-body + "Handle transmitting the response body based on the type" + (fn [ch resp-body] (type resp-body))) +(defmethod transmit-body clojure.core.async.impl.channels.ManyToManyChannel + [ch resp-body] + (write-async-data ch resp-body)) +(defmethod transmit-body String + [ch resp-body] + (write-direct-data ch (.getBytes resp-body))) +(defmethod transmit-body InputStream + [ch resp-body] + (write-streaming-data ch resp-body)) +(defmethod transmit-body java.io.File + [ch resp-body] + (with-open [is (io/input-stream resp-body)] + (write-streaming-data ch is))) +(defmethod transmit-body :default + [ch resp-body] + (write-direct-data ch resp-body)) + +(defmulti ^:no-doc transmit-trailers + "Handle transmitting the trailers based on the type" + (fn [exchange trailers] (type trailers))) +(defmethod transmit-trailers clojure.core.async.impl.channels.ManyToManyChannel + [exchange trailers] + (p/promise + (fn [resolve reject] + (go + (resolve (write-trailers exchange ( (p/all [input-status + (transmit-body output-ch body) + (transmit-trailers exchange trailers)]) + (p/then (fn [_] (close-output-channel exchange output-ch))) + (p/catch (fn [ex] + (log/error "Error:" (with-out-str (pprint ex))) + (.endExchange exchange)))))) + +(defn provider + "Generates our undertow provider, which defines the callback point between + the undertow container and pedestal by dealing with the dispatch interop. + Our real work occurs in the (request) form above" + [service-map] + (let [interceptors (::http/interceptors service-map)] + (assoc service-map ::handler + (reify HttpHandler + (handleRequest [this exchange] + (handle-request interceptors exchange)))))) + +(defn config + "Given a service map (with interceptor provider established) and a server-opts map, + Return a map of :server, :start-fn, and :stop-fn. + Both functions are 0-arity" + [service-map + {:keys [host port] + {:keys [ssl-port] :as ssl-config} :container-options + :or {host "127.0.0.1"}}] + (let [handler (::handler service-map) + server (-> (Undertow/builder) + (cond-> + ;; start http listener when no ssl-context is set + ;; or if ssl-port is set in addition to port + (or (nil? ssl-config) + (and port ssl-port)) + (.addHttpListener port host) + ;; listens on port unless ssl-port is set + (some? ssl-config) + (.addHttpsListener (or ssl-port port) host (ssl/keystore-> ssl-config))) + (.setServerOption UndertowOptions/ENABLE_HTTP2 true) + (.setHandler handler) + (.build))] + {:server server + :start-fn (fn [] + (.start server) + server) + :stop-fn (fn [] + (.stop server) + server)})) diff --git a/src/protojure/pedestal/interceptors/grpc.clj b/src/protojure/pedestal/interceptors/grpc.clj new file mode 100644 index 0000000..80554c2 --- /dev/null +++ b/src/protojure/pedestal/interceptors/grpc.clj @@ -0,0 +1,98 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.pedestal.interceptors.grpc + "A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for [GRPC](https://grpc.io/) support" + (:require [clojure.core.async :refer [go >! > (clojure.string/split accepted-encodings #",") + (filter supported-encodings) + (first))) + +(defn- generate-trailers + [{:keys [grpc-status grpc-message] :or {grpc-status 0}}] + (-> {"grpc-status" grpc-status} + (cond-> (some? grpc-message) (assoc "grpc-message" grpc-message)))) + +(defn- create-req-ctx + [f {:keys [body-ch] {:strs [grpc-encoding] :or {grpc-encoding "identity"}} :headers :as req}] + (let [in body-ch + out (async/chan 16)] + {:in in + :out out + :encoding grpc-encoding + :status (lpm/decode f in out {:content-coding grpc-encoding})})) + +(defn- create-resp-ctx + [f {{:strs [grpc-accept-encoding] :or {grpc-accept-encoding ""}} :headers :as req}] + (let [in (async/chan 16) + out (async/chan 16) + encoding (or (determine-output-encoding grpc-accept-encoding) "identity")] + {:in in + :out out + :encoding encoding + :status (lpm/encode f in out {:content-coding encoding :max-frame-size 16383})})) + +(defn- set-params [context params] + (assoc-in context [:request :grpc-params] params)) + +(defn- grpc-enter + " interceptor for handling GRPC requests" + [{:keys [server-streaming client-streaming input output] :as rpc-metadata} + {:keys [request] :as context}] + (let [req-ctx (create-req-ctx input request) + resp-ctx (create-resp-ctx output request) + input-ch (:out req-ctx) + context (-> context + (assoc ::ctx {:req-ctx req-ctx :resp-ctx resp-ctx}) + (cond-> server-streaming + (assoc-in [:request :grpc-out] (:in resp-ctx))))] + + ;; set :grpc-params + (if client-streaming + (set-params context input-ch) ;; client-streaming means simply pass the channel directly + (if-let [params (async/poll! input-ch)] + (set-params context params) ;; materialize unary params opportunistically, if available + (go (set-params context ( interceptor for handling GRPC responses" + [{:keys [server-streaming] :as rpc-metadata} + {:keys [response] {:keys [req-ctx resp-ctx]} ::ctx :as context}] + + ;; special-case unary return types + (when-not server-streaming + (let [output-ch (:in resp-ctx)] + (async/put! output-ch (:body response)) + (async/close! output-ch))) + + (let [trailers-ch (async/promise-chan)] + ;; defer sending trailers until our IO has completed + (-> (p/all (mapv :status [req-ctx resp-ctx])) + (p/then (fn [_] (async/put! trailers-ch (generate-trailers response)))) + (p/catch (fn [ex] + (log/error "Pipeline error: " ex) + (async/put! trailers-ch (generate-trailers {:grpc-status 13}))))) + + (update context :response + #(assoc % + :headers {"Content-Type" "application/grpc+proto" + "grpc-encoding" (:encoding resp-ctx)} + :status 200 + :body (:out resp-ctx) + :trailers trailers-ch)))) + +(defn interceptor + [rpc-metadata] + (pedestal/interceptor {:name ::interceptor + :enter (partial grpc-enter rpc-metadata) + :leave (partial grpc-leave rpc-metadata)})) diff --git a/src/protojure/pedestal/interceptors/grpc_web.clj b/src/protojure/pedestal/interceptors/grpc_web.clj new file mode 100644 index 0000000..317c700 --- /dev/null +++ b/src/protojure/pedestal/interceptors/grpc_web.clj @@ -0,0 +1,43 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.pedestal.interceptors.grpc-web + "A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol" + (:require [io.pedestal.interceptor :refer [->Interceptor]]) + (:import (org.apache.commons.codec.binary Base64InputStream)) + (:refer-clojure :exclude [proxy])) + +(defn- decode-body + [{:keys [body] :as request}] + (assoc request :body (Base64InputStream. body))) + +(def ^{:no-doc true :const true} content-types + #{"application/grpc-web-text" + "application/grpc-web-text+proto"}) + +(defn- web-text? + [{{:strs [content-type]} :headers}] + (contains? content-types content-type)) + +(defn- pred-> + "Threads 'item' through both the predicate and, when 'pred' evaluates true, 'xform' functions. Else, just returns 'item'" + [item pred xform] + (cond-> item (pred item) xform)) + +(defn- enter-handler + [{:keys [request] :as ctx}] + (assoc ctx :request (pred-> request web-text? decode-body))) + +(defn- leave-handler + [ctx] + ;; TODO "Clarify & implement grpc-web trailer behavior" + ctx) + +(defn- exception-handler + [ctx e] + (assoc ctx :io.pedestal.interceptor.chain/error e)) + +(def proxy + "Interceptor that provides a transparent proxy for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol to standard protojure grpc protocol" + (->Interceptor ::proxy enter-handler leave-handler exception-handler)) diff --git a/src/protojure/pedestal/routes.clj b/src/protojure/pedestal/routes.clj new file mode 100644 index 0000000..8c684a1 --- /dev/null +++ b/src/protojure/pedestal/routes.clj @@ -0,0 +1,20 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.pedestal.routes + "Utilities for generating GRPC endpoints as [Pedestal Routes](http://pedestal.io/guides/defining-routes)" + (:require [protojure.pedestal.interceptors.grpc :as grpc] + [protojure.pedestal.interceptors.grpc-web :as grpc.web] + [io.pedestal.interceptor.helpers :as pedestal])) + +(defn ->tablesyntax + "Generates routes in [Table Syntax](http://pedestal.io/reference/table-syntax) format" + [{:keys [rpc-metadata interceptors callback-context] :as options}] + (for [{:keys [pkg service method method-fn] :as rpc} rpc-metadata] + (let [fqs (str pkg "." service) + name (keyword fqs (str method "-handler")) + handler (pedestal/handler name (partial method-fn callback-context))] + [(str "/" fqs "/" method) + :post (conj interceptors grpc.web/proxy (grpc/interceptor rpc) handler) + :route-name name]))) \ No newline at end of file diff --git a/src/protojure/pedestal/ssl.clj b/src/protojure/pedestal/ssl.clj new file mode 100644 index 0000000..4e79b5f --- /dev/null +++ b/src/protojure/pedestal/ssl.clj @@ -0,0 +1,31 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.pedestal.ssl + (:require [clojure.java.io :as io]) + (:import (javax.net.ssl SSLContext KeyManagerFactory) + (java.security KeyStore))) + +(defn- load-keystore + [keystore password] + (if (instance? KeyStore keystore) + keystore + (with-open [in (io/input-stream keystore)] + (doto (KeyStore/getInstance (KeyStore/getDefaultType)) + (.load in (.toCharArray password)))))) + +(defn- keystore->key-managers + "Return a KeyManager[] given a KeyStore and password" + [keystore password] + (.getKeyManagers + (doto (KeyManagerFactory/getInstance (KeyManagerFactory/getDefaultAlgorithm)) + (.init keystore (.toCharArray password))))) + +(defn keystore-> + "Turn a keystore, which may be either strings denoting file paths or actual KeyStore + instances, into an SSLContext instance" + [{:keys [keystore key-password]}] + (let [ks (load-keystore keystore key-password)] + (doto (SSLContext/getInstance "TLS") + (.init (keystore->key-managers ks key-password) nil nil)))) diff --git a/src/protojure/protobuf.clj b/src/protojure/protobuf.clj new file mode 100644 index 0000000..d3b294a --- /dev/null +++ b/src/protojure/protobuf.clj @@ -0,0 +1,24 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf + "Main API entry point for protobuf applications" + (:import (com.google.protobuf + CodedOutputStream))) + +(defprotocol Writer + (serialize [this os]) + (length [this])) + +(defn ->pb + "Serialize a record implementing the [[Writer]] protocol into protobuf bytes." + ([msg] + (let [len (length msg) + data (byte-array len)] + (->pb msg data) + data)) + ([msg output] + (let [os (CodedOutputStream/newInstance output)] + (serialize msg os) + (.flush os)))) diff --git a/src/protojure/protobuf/serdes.clj b/src/protojure/protobuf/serdes.clj new file mode 100644 index 0000000..bb59da4 --- /dev/null +++ b/src/protojure/protobuf/serdes.clj @@ -0,0 +1,253 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf.serdes + "Serializer/deserializer support for fundamental protobuf types." + (:require [protojure.protobuf :as pb]) + (:import (com.google.protobuf CodedInputStream + CodedOutputStream + WireFormat + UnknownFieldSet + ExtensionRegistry + ByteString))) + +(defn tag-map + " + Returns a lazy sequence consisting of the result of applying f to the set of + protobuf objects delimited by protobuf tags. + + #### Parameters + + | Value | Type | Description | + |----------|--------------------|------------------------------------------------------------------------------------------------| + | **init** | _map_ | A map of initial values | + | **f** | _(fn [tag index])_ | An arity-2 function that accepts a tag and index and returns a [k v] (see _Return type_ below) | + | **is** | [CodedInputStream](https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/CodedInputStream) | An input stream containing serialized protobuf data | + + #### Return Type + + _f_ should evaluate to a 2-entry vector in the form [key value], where: + + - _key_ is either + - a keyword representing the field name when the index is known + - simply the index value when it is not + - _value_ is either + - a value that will be returned verbatim to be associated to the _key_ + - a function that will take a collection of previously deserialized values with the same tag and update it to incorporate the new value (to support _repeated_ types, etc) + + + #### Example + + ``` + (tag-map + (fn [tag index] + (case index + 1 [:currency_code (cis->String is)] + 2 [:units (cis->Int64 is)] + 3 [:nanos (cis->Int32 is)] + [index (cis->undefined tag is)])) + is)) + ``` + " + ([f is] + (tag-map {} f is)) + ([init f is] + (loop [acc init tag (.readTag is)] + (if (pos? tag) + (let [[k v] (f tag (bit-shift-right tag 3))] + (recur (if (fn? v) + (update acc k v) + (assoc acc k v)) + (.readTag is))) + acc)))) + +(defn- defparsefn [type] + (let [name (symbol (str "cis->" type)) + sym (str "read" type) + f (eval `(fn [is#] + (. is# ~(symbol sym)))) + m {:doc (format "Deserialize a '%s' type" type) + :arglists '([is])}] + (intern *ns* (with-meta name m) f))) + +(defn- defwritefn [type default?] + (let [name (symbol (str "write-" type)) + sym (str "write" type) + f (eval `(fn [os# tag# value#] + (. os# ~(symbol sym) tag# value#))) + m {:doc (format "Serialize a '%s' type" type) + :arglists '([tag value os] [tag {:keys [optimize] :or {optimize true} :as options} value os])}] + (intern *ns* (with-meta name m) + (fn self + ([tag value os] + (self tag {} value os)) + ([tag {:keys [optimize] :or {optimize true} :as options} value os] + (when-not (and optimize (default? value)) + (f os tag value))))))) + +(defn- defsizefn [type default?] + (let [name (symbol (str "size-" type)) + sym (str "compute" type "Size") + f (eval `(fn [tag# value#] + (. CodedOutputStream ~(symbol sym) tag# value#))) + m {:doc (format "Compute length of serialized '%s' type" type) + :arglists '([tag value] [tag {:keys [optimize] :or {optimize true} :as options} value])}] + (intern *ns* (with-meta name m) + (fn self + ([tag value] + (self tag {} value)) + ([tag {:keys [optimize] :or {optimize true} :as options} value] + (if-not (and optimize (default? value)) + (f tag value) + 0)))))) + +(defn- defallfn [type default?] + (defparsefn type) + (defwritefn type default?) + (defsizefn type default?)) + +(def ^:no-doc numeric-scalars + ["Double" + "Enum" + "Fixed32" + "Fixed64" + "Float" + "Int32" + "Int64" + "SFixed32" + "SFixed64" + "SInt32" + "SInt64" + "UInt32" + "UInt64"]) + +(defn- init [] + (doseq [type numeric-scalars] + (defallfn type #(or (nil? %) (zero? %)))) + + (defallfn "String" empty?) + (defallfn "Bool" #(not (true? %)))) + +(init) + +;; manually implement the "Bytes" scalar so we can properly handle native byte-array import/export +(defn cis->Bytes + "Deserialize 'Bytes' type" + [is] + (.toByteArray (.readBytes is))) + +(defn write-Bytes + "Serialize 'Bytes' type" + ([tag value os] + (write-Bytes tag {} value os)) + ([tag {:keys [optimize] :or {optimize true} :as options} value os] + (when-not (and optimize (empty? value)) + (let [bytestring (ByteString/copyFrom value)] + (.writeBytes os tag bytestring))))) + +(defn size-Bytes + "Compute length of serialized 'Bytes' type" + ([tag value] + (size-Bytes tag {} value)) + ([tag {:keys [optimize] :or {optimize true} :as options} value] + (if-not (and optimize (empty? value)) + (let [bytestring (ByteString/copyFrom value)] + (CodedOutputStream/computeBytesSize tag bytestring)) + 0))) + +(defn cis->undefined + "Deserialize an unknown type, retaining its tag/type" + [tag is] + (let [num (WireFormat/getTagFieldNumber tag) + type (WireFormat/getTagWireType tag)] + (case type + 0 (.readInt64 is) + 1 (.readFixed64 is) + 2 (.readBytes is) + 3 (.readGroup is num (UnknownFieldSet/newBuilder) (ExtensionRegistry/getEmptyRegistry)) + 4 nil + 5 (.readFixed32 is)))) + +(defn cis->embedded + "Deserialize an embedded type, where **f** is an (fn) that can deserialize the embedded message" + [f is] + (let [len (.readRawVarint32 ^CodedInputStream is) + lim (.pushLimit is len)] + (let [result (f is)] + (.popLimit is lim) + result))) + +(defn cis->map + "Deserialize a wire format map-type to user format [key val]" + [f is] + (let [{:keys [key value]} (f is)] + (partial into {key value}))) + +(defn cis->repeated + "Deserialize an 'unpacked' repeated type (see [[cis->packablerepeated]])" + [f is] + (fn [coll] + (conj (or coll []) (f is)))) + +(defn cis->packedrepeated + "Deserialize a 'packed' repeated type (see [[cis->packablerepeated]])" + [f is] + (fn [coll] + (let [len (.readRawVarint32 ^CodedInputStream is)] + (reduce conj (or coll []) (repeatedly len #(f is)))))) + +(defn cis->packablerepeated + " + Deserialize a repeated type which may optionally support [packed format](https://developers.google.com/protocol-buffers/docs/encoding#packed). + The field type will indicate unpacked (0) vs packed (2). + " + [tag f is] + (let [type (WireFormat/getTagWireType tag)] + (case type + 0 (cis->repeated f is) + 2 (cis->packedrepeated f is) + (cis->undefined tag is)))) + +(defn write-embedded + "Serialize an embedded type along with tag/length metadata" + [tag item os] + (let [len (if (some? item) (pb/length item) 0)] + (when-not (zero? len) + (.writeTag os tag 2);; embedded messages are always type=2 (string) + (.writeUInt32NoTag os len) + (pb/serialize item os)))) + +;; FIXME: Add support for optimizing packable types +(defn write-repeated + "Serialize a repeated type" + [f tag items os] + (doseq [item items] + (f tag item os))) + +(defn write-map + "Serialize user format [key val] using given map item constructor" + [constructor tag items os] + (write-repeated write-embedded tag (map (fn [[key value]] (constructor {:key key :value value})) items) os)) + +(defn size-embedded + "Compute length of serialized embedded type, including the metadata header" + [tag item] + (let [len (if (some? item) (pb/length item) 0)] + (if-not (zero? len) + (+ + (size-UInt32 tag {:optimize false} len) ;; This accounts for the tag+length preamble + len) ;; And this is the embedded item itself + 0))) + +(defn size-repeated + "Compute length of serialized repeated type" + [f tag items] + (if-not (empty? items) + (reduce + (map (partial f tag) items)) + 0)) + +(defn size-map + "Compute length of user format [key val] using given map item constructor" + [constructor tag item] + (size-repeated size-embedded tag (map (fn [[key value]] (constructor {:key key :value value})) item))) diff --git a/test/example/hello.clj b/test/example/hello.clj new file mode 100644 index 0000000..9772f36 --- /dev/null +++ b/test/example/hello.clj @@ -0,0 +1,186 @@ +;;;---------------------------------------------------------------------------------- +;;; Generated by protoc-gen-clojure. DO NOT EDIT +;;; +;;; Message Implementation of package com.sttgts.omnia.hello +;;;---------------------------------------------------------------------------------- +(ns example.hello + (:require [protojure.protobuf :as pb] + [protojure.protobuf.serdes :refer :all] + [clojure.set :as set] + [clojure.spec.alpha :as s]) + (:import (com.google.protobuf + CodedInputStream))) + +;;---------------------------------------------------------------------------------- +;;---------------------------------------------------------------------------------- +;; Forward declarations +;;---------------------------------------------------------------------------------- +;;---------------------------------------------------------------------------------- + +(declare cis->HelloRequest) +(declare ecis->HelloRequest) +(declare new-HelloRequest) +(declare cis->RepeatHelloRequest) +(declare ecis->RepeatHelloRequest) +(declare new-RepeatHelloRequest) +(declare cis->HelloReply) +(declare ecis->HelloReply) +(declare new-HelloReply) + +;;---------------------------------------------------------------------------------- +;;---------------------------------------------------------------------------------- +;; Message Implementations +;;---------------------------------------------------------------------------------- +;;---------------------------------------------------------------------------------- + +;----------------------------------------------------------------------------- +; HelloRequest +;----------------------------------------------------------------------------- +(defrecord HelloRequest [name] + pb/Writer + + (serialize [this os] + (write-String 1 {:optimize true} (:name this) os)) + + (length [this] + (reduce + [(size-String 1 {:optimize true} (:name this))]))) + +(s/def :com.sttgts.omnia.hello.messages.HelloRequest/name string?) +(s/def ::HelloRequest-spec (s/keys :opt-un [:com.sttgts.omnia.hello.messages.HelloRequest/name])) +(def HelloRequest-defaults {:name ""}) + +(defn cis->HelloRequest + "CodedInputStream to HelloRequest" + [is] + (->> (tag-map HelloRequest-defaults + (fn [tag index] + (case index + 1 [:name (cis->String is)] + + [index (cis->undefined tag is)])) + is) + (map->HelloRequest))) + +(defn ecis->HelloRequest + "Embedded CodedInputStream to HelloRequest" + [is] + (cis->embedded cis->HelloRequest is)) + +(defn new-HelloRequest + "Creates a new instance from a map, similar to map->HelloRequest except that + it properly accounts for nested messages, when applicable. + " + [init] + {:pre [(if (s/valid? ::HelloRequest-spec init) true (throw (ex-info "Invalid input" (s/explain-data ::HelloRequest-spec init))))]} + (-> (merge HelloRequest-defaults init) + (map->HelloRequest))) + +(defn pb->HelloRequest + "Protobuf to HelloRequest" + [input] + (-> input + CodedInputStream/newInstance + cis->HelloRequest)) + +;----------------------------------------------------------------------------- +; RepeatHelloRequest +;----------------------------------------------------------------------------- +(defrecord RepeatHelloRequest [name count] + pb/Writer + + (serialize [this os] + (write-String 1 {:optimize true} (:name this) os) + (write-Int32 2 {:optimize true} (:count this) os)) + + (length [this] + (reduce + [(size-String 1 {:optimize true} (:name this)) + (size-Int32 2 {:optimize true} (:count this))]))) + +(s/def :com.sttgts.omnia.hello.messages.RepeatHelloRequest/name string?) +(s/def :com.sttgts.omnia.hello.messages.RepeatHelloRequest/count int?) +(s/def ::RepeatHelloRequest-spec (s/keys :opt-un [:com.sttgts.omnia.hello.messages.RepeatHelloRequest/name :com.sttgts.omnia.hello.messages.RepeatHelloRequest/count])) +(def RepeatHelloRequest-defaults {:name "" :count 0}) + +(defn cis->RepeatHelloRequest + "CodedInputStream to RepeatHelloRequest" + [is] + (->> (tag-map RepeatHelloRequest-defaults + (fn [tag index] + (case index + 1 [:name (cis->String is)] + 2 [:count (cis->Int32 is)] + + [index (cis->undefined tag is)])) + is) + (map->RepeatHelloRequest))) + +(defn ecis->RepeatHelloRequest + "Embedded CodedInputStream to RepeatHelloRequest" + [is] + (cis->embedded cis->RepeatHelloRequest is)) + +(defn new-RepeatHelloRequest + "Creates a new instance from a map, similar to map->RepeatHelloRequest except that + it properly accounts for nested messages, when applicable. + " + [init] + {:pre [(if (s/valid? ::RepeatHelloRequest-spec init) true (throw (ex-info "Invalid input" (s/explain-data ::RepeatHelloRequest-spec init))))]} + (-> (merge RepeatHelloRequest-defaults init) + (map->RepeatHelloRequest))) + +(defn pb->RepeatHelloRequest + "Protobuf to RepeatHelloRequest" + [input] + (-> input + CodedInputStream/newInstance + cis->RepeatHelloRequest)) + +;----------------------------------------------------------------------------- +; HelloReply +;----------------------------------------------------------------------------- +(defrecord HelloReply [message] + pb/Writer + + (serialize [this os] + (write-String 1 {:optimize true} (:message this) os)) + + (length [this] + (reduce + [(size-String 1 {:optimize true} (:message this))]))) + +(s/def :com.sttgts.omnia.hello.messages.HelloReply/message string?) +(s/def ::HelloReply-spec (s/keys :opt-un [:com.sttgts.omnia.hello.messages.HelloReply/message])) +(def HelloReply-defaults {:message ""}) + +(defn cis->HelloReply + "CodedInputStream to HelloReply" + [is] + (->> (tag-map HelloReply-defaults + (fn [tag index] + (case index + 1 [:message (cis->String is)] + + [index (cis->undefined tag is)])) + is) + (map->HelloReply))) + +(defn ecis->HelloReply + "Embedded CodedInputStream to HelloReply" + [is] + (cis->embedded cis->HelloReply is)) + +(defn new-HelloReply + "Creates a new instance from a map, similar to map->HelloReply except that + it properly accounts for nested messages, when applicable. + " + [init] + {:pre [(if (s/valid? ::HelloReply-spec init) true (throw (ex-info "Invalid input" (s/explain-data ::HelloReply-spec init))))]} + (-> (merge HelloReply-defaults init) + (map->HelloReply))) + +(defn pb->HelloReply + "Protobuf to HelloReply" + [input] + (-> input + CodedInputStream/newInstance + cis->HelloReply)) + diff --git a/test/example/hello/Greeter.clj b/test/example/hello/Greeter.clj new file mode 100644 index 0000000..cfcb6c1 --- /dev/null +++ b/test/example/hello/Greeter.clj @@ -0,0 +1,47 @@ +;;;---------------------------------------------------------------------------------- +;;; Generated by protoc-gen-clojure. DO NOT EDIT +;;; +;;; GRPC implementation of Greeter service from package example.hello +;;;---------------------------------------------------------------------------------- +(ns example.hello.Greeter + (:require [example.hello :refer :all])) + +;;---------------------------------------------------------------------------------- +;;---------------------------------------------------------------------------------- +;; GRPC Implementations +;;---------------------------------------------------------------------------------- +;;---------------------------------------------------------------------------------- + +;----------------------------------------------------------------------------- +; GRPC Greeter +;----------------------------------------------------------------------------- +(defprotocol Service + (SayHello [this param]) + (SayRepeatHello [this param]) + (SayHelloAfterDelay [this param]) + (SayHelloOnDemand [this param]) + (SayHelloError [this param])) + +(defn- SayHello-dispatch + [ctx request] + (SayHello ctx request)) +(defn- SayRepeatHello-dispatch + [ctx request] + (SayRepeatHello ctx request)) +(defn- SayHelloAfterDelay-dispatch + [ctx request] + (SayHelloAfterDelay ctx request)) +(defn- SayHelloOnDemand-dispatch + [ctx request] + (SayHelloOnDemand ctx request)) +(defn- SayHelloError-dispatch + [ctx request] + (SayHelloError ctx request)) + +(def ^:const rpc-metadata + [{:pkg "example.hello" :service "Greeter" :method "SayHello" :method-fn SayHello-dispatch :server-streaming false :client-streaming false :input pb->HelloRequest :output new-HelloReply} + {:pkg "example.hello" :service "Greeter" :method "SayRepeatHello" :method-fn SayRepeatHello-dispatch :server-streaming true :client-streaming false :input pb->RepeatHelloRequest :output new-HelloReply} + {:pkg "example.hello" :service "Greeter" :method "SayHelloAfterDelay" :method-fn SayHelloAfterDelay-dispatch :server-streaming false :client-streaming false :input pb->HelloRequest :output new-HelloReply} + {:pkg "example.hello" :service "Greeter" :method "SayHelloOnDemand" :method-fn SayHelloOnDemand-dispatch :server-streaming true :client-streaming true :input pb->HelloRequest :output new-HelloReply} + {:pkg "example.hello" :service "Greeter" :method "SayHelloError" :method-fn SayHelloError-dispatch :server-streaming false :client-streaming false :input pb->HelloRequest :output new-HelloReply}]) + diff --git a/test/example/types.clj b/test/example/types.clj new file mode 100644 index 0000000..8c77d76 --- /dev/null +++ b/test/example/types.clj @@ -0,0 +1,322 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns example.types + (:require [protojure.protobuf :as pb] + [protojure.protobuf.serdes :refer :all]) + (:import (com.google.protobuf CodedInputStream))) + +;----------------------------------------------------------------------------- +; Money +; +; implementation of https://github.com/googleapis/googleapis/blob/master/google/type/money.proto +;----------------------------------------------------------------------------- +(defrecord Money [currency_code units nanos] + pb/Writer + + (serialize [this os] + (write-String 1 (:currency_code this) os) + (write-Int64 2 (:units this) os) + (write-Int32 3 (:nanos this) os)) + + (length [this] + (+ + (size-String 1 (:currency_code this)) + (size-Int64 2 (:units this)) + (size-Int32 3 (:nanos this))))) + +(def Money-defaults {:currency_code "" :units 0 :nanos 0}) + +(defn cis->Money + "CodedInputStream to Money" + [is] + (->> (tag-map + (fn [tag index] + (case index + 1 [:currency_code (cis->String is)] + 2 [:units (cis->Int64 is)] + 3 [:nanos (cis->Int32 is)] + [index (cis->undefined tag is)])) + is) + (merge Money-defaults) + (map->Money))) + +(defn ecis->Money + "Embedded CodedInputStream to Money" + [is] + (cis->embedded cis->Money is)) + +(defn new-Money + "Creates a new instance from a map, similar to map->Money except that + it properly accounts for nested messages, when applicable. + " + [init] + (-> (merge Money-defaults init) + (map->Money))) + +(defn pb->Money + "Protobuf to Money" + [input] + (-> input + CodedInputStream/newInstance + cis->Money)) + +;----------------------------------------------------------------------------- +; SimpleRepeated +;----------------------------------------------------------------------------- +(defrecord SimpleRepeated [data] + pb/Writer + + (serialize [this os] + (write-repeated write-Int32 1 (:data this) os)) + + (length [this] + (size-repeated size-Int32 1 (:data this)))) + +(def SimpleRepeated-defaults {:data []}) + +(defn cis->SimpleRepeated + "CodedInputStream to SimpleRepeated" + [is] + (->> (tag-map + (fn [tag index] + (case index + 1 [:data (cis->packablerepeated tag cis->Int32 is)] + + [index (cis->undefined tag is)])) + is) + (merge SimpleRepeated-defaults) + (map->SimpleRepeated))) + +(defn ecis->SimpleRepeated + "Embedded CodedInputStream to SimpleRepeated" + [is] + (cis->embedded cis->SimpleRepeated is)) + +(defn new-SimpleRepeated + "Creates a new instance from a map, similar to map->SimpleRepeated except that + it properly accounts for nested messages, when applicable. + " + [init] + (-> (merge SimpleRepeated-defaults init) + (map->SimpleRepeated))) + +(defn pb->SimpleRepeated + "Protobuf to SimpleRepeated" + [input] + (-> input + CodedInputStream/newInstance + cis->SimpleRepeated)) + +;----------------------------------------------------------------------------- +; SimpleString +;----------------------------------------------------------------------------- +(defrecord SimpleString [s] + pb/Writer + + (serialize [this os] + (write-String 1 {:optimize true} (:s this) os)) + + (length [this] + (size-String 1 {:optimize true} (:s this)))) + +(def SimpleString-defaults {:s ""}) + +(defn cis->SimpleString + "CodedInputStream to SimpleString" + [is] + (->> (tag-map + (fn [tag index] + (case index + 1 [:s (cis->String is)] + + [index (cis->undefined tag is)])) + is) + (merge SimpleString-defaults) + (map->SimpleString))) + +(defn ecis->SimpleString + "Embedded CodedInputStream to SimpleString" + [is] + (cis->embedded cis->SimpleString is)) + +(defn new-SimpleString + "Creates a new instance from a map, similar to map->SimpleString except that + it properly accounts for nested messages, when applicable. + " + [init] + (-> (merge SimpleString-defaults init) + (map->SimpleString))) + +(defn pb->SimpleString + "Protobuf to SimpleString" + [input] + (-> input + CodedInputStream/newInstance + cis->SimpleString)) + +;----------------------------------------------------------------------------- +; AllThingsMap-MSimpleEntry +;----------------------------------------------------------------------------- +(defrecord AllThingsMap-MSimpleEntry [key value] + pb/Writer + + (serialize [this os] + (write-String 1 {:optimize true} (:key this) os) + (write-Int32 2 {:optimize true} (:value this) os)) + + (length [this] + (+ + (size-String 1 {:optimize true} (:key this)) + (size-Int32 2 {:optimize true} (:value this))))) + +(def AllThingsMap-MSimpleEntry-defaults {:key "" :value 0}) + +(defn cis->AllThingsMap-MSimpleEntry + "CodedInputStream to AllThingsMap-MSimpleEntry" + [is] + (->> (tag-map + (fn [tag index] + (case index + 1 [:key (cis->String is)] + 2 [:value (cis->Int32 is)] + + [index (cis->undefined tag is)])) + is) + (merge AllThingsMap-MSimpleEntry-defaults) + (map->AllThingsMap-MSimpleEntry))) + +(defn ecis->AllThingsMap-MSimpleEntry + "Embedded CodedInputStream to AllThingsMap-MSimpleEntry" + [is] + (cis->embedded cis->AllThingsMap-MSimpleEntry is)) + +(defn new-AllThingsMap-MSimpleEntry + "Creates a new instance from a map, similar to map->AllThingsMap-MSimpleEntry except that + it properly accounts for nested messages, when applicable. + " + [init] + (-> (merge AllThingsMap-MSimpleEntry-defaults init) + (map->AllThingsMap-MSimpleEntry))) + +(defn pb->AllThingsMap-MSimpleEntry + "Protobuf to AllThingsMap-MSimpleEntry" + [input] + (-> input + CodedInputStream/newInstance + cis->AllThingsMap-MSimpleEntry)) + +;----------------------------------------------------------------------------- +; AllThingsMap-MComplexEntry +;----------------------------------------------------------------------------- +(defrecord AllThingsMap-MComplexEntry [key value] + pb/Writer + + (serialize [this os] + (write-String 1 {:optimize true} (:key this) os) + (write-embedded 2 (:value this) os)) + + (length [this] + (+ + (size-String 1 {:optimize true} (:key this)) + (size-embedded 2 (:value this))))) + +(def AllThingsMap-MComplexEntry-defaults {:key ""}) + +(defn cis->AllThingsMap-MComplexEntry + "CodedInputStream to AllThingsMap-MComplexEntry" + [is] + (->> (tag-map + (fn [tag index] + (case index + 1 [:key (cis->String is)] + 2 [:value (ecis->SimpleString is)] + + [index (cis->undefined tag is)])) + is) + (merge AllThingsMap-MComplexEntry-defaults) + (map->AllThingsMap-MComplexEntry))) + +(defn ecis->AllThingsMap-MComplexEntry + "Embedded CodedInputStream to AllThingsMap-MComplexEntry" + [is] + (cis->embedded cis->AllThingsMap-MComplexEntry is)) + +(defn new-AllThingsMap-MComplexEntry + "Creates a new instance from a map, similar to map->AllThingsMap-MComplexEntry except that + it properly accounts for nested messages, when applicable. + " + [init] + (-> (merge AllThingsMap-MComplexEntry-defaults init) + (cond-> (contains? init :value) (update :value new-SimpleString)) + (map->AllThingsMap-MComplexEntry))) + +(defn pb->AllThingsMap-MComplexEntry + "Protobuf to AllThingsMap-MComplexEntry" + [input] + (-> input + CodedInputStream/newInstance + cis->AllThingsMap-MComplexEntry)) + +;----------------------------------------------------------------------------- +; AllThingsMap +;----------------------------------------------------------------------------- +(defrecord AllThingsMap [s i mSimple mComplex sSimple oe] + pb/Writer + + (serialize [this os] + (write-String 1 {:optimize true} (:s this) os) + (write-Int32 2 {:optimize true} (:i this) os) + (write-map new-AllThingsMap-MSimpleEntry 3 (:mSimple this) os) + (write-map new-AllThingsMap-MComplexEntry 4 (:mComplex this) os) + (write-embedded 5 (:sSimple this) os)) + + (length [this] + (+ + (size-String 1 {:optimize true} (:s this)) + (size-Int32 2 {:optimize true} (:i this)) + (size-map new-AllThingsMap-MSimpleEntry 3 (:mSimple this)) + (size-map new-AllThingsMap-MComplexEntry 4 (:mComplex this)) + (size-embedded 5 (:sSimple this))))) + +(def AllThingsMap-defaults {:s "" :i 0 :mSimple [] :mComplex []}) + +(defn cis->AllThingsMap + "CodedInputStream to AllThingsMap" + [is] + (->> (tag-map + (fn [tag index] + (case index + 1 [:s (cis->String is)] + 2 [:i (cis->Int32 is)] + 3 [:mSimple (cis->map ecis->AllThingsMap-MSimpleEntry is)] + 4 [:mComplex (cis->map ecis->AllThingsMap-MComplexEntry is)] + 5 [:sSimple (ecis->SimpleString is)] + + [index (cis->undefined tag is)])) + is) + (merge AllThingsMap-defaults) + (map->AllThingsMap))) + +(defn ecis->AllThingsMap + "Embedded CodedInputStream to AllThingsMap" + [is] + (cis->embedded cis->AllThingsMap is)) + +(defn new-AllThingsMap + "Creates a new instance from a map, similar to map->AllThingsMap except that + it properly accounts for nested messages, when applicable. + " + [init] + (-> (merge AllThingsMap-defaults init) + (cond-> (contains? init :sSimple) (update :sSimple new-SimpleString)) + (map->AllThingsMap))) + +(defn pb->AllThingsMap + "Protobuf to AllThingsMap" + [input] + (-> input + CodedInputStream/newInstance + cis->AllThingsMap)) + diff --git a/test/protojure/grpc_test.clj b/test/protojure/grpc_test.clj new file mode 100644 index 0000000..ed33399 --- /dev/null +++ b/test/protojure/grpc_test.clj @@ -0,0 +1,455 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.grpc-test + (:require [clojure.test :refer :all] + [clojure.string :as string] + [clojure.core.async :refer [!! ! go go-loop] :as async] + [promesa.core :as p] + [io.pedestal.http :as pedestal] + [io.pedestal.http.body-params :as body-params] + [taoensso.timbre :as log] + [taoensso.timbre.appenders.core :as appenders] + [taoensso.timbre.tools.logging :refer [use-timbre]] + [protojure.pedestal.core :as protojure.pedestal] + [protojure.pedestal.routes :as pedestal.routes] + [protojure.grpc.client.api :as grpc] + [protojure.grpc.client.providers.http2 :as grpc.http2] + [protojure.internal.grpc.client.providers.http2.jetty :as jetty-client] + [protojure.grpc.client.utils :as client.utils] + [protojure.test.utils :as test.utils :refer [data-equal?]] + [example.types :as example] + [example.hello :refer [new-HelloRequest pb->HelloRequest new-HelloReply pb->HelloReply]] + [example.hello.Greeter :as greeter]) + (:refer-clojure :exclude [resolve])) + +(log/set-config! {:level :trace + :ns-whitelist ["protojure.*"] + :appenders {:println (appenders/println-appender {:stream :auto})}}) + +(use-timbre) + +;;----------------------------------------------------------------------------- +;; Data +;;----------------------------------------------------------------------------- +(defonce test-env (atom {})) + +(def test-trailers {"foo" "baz" + "bar" "bat"}) + +;;----------------------------------------------------------------------------- +;; Mock endpoint +;;----------------------------------------------------------------------------- +(defn- echo [{:keys [body] :as request}] + {:status 200 :body body}) + +(defn- get-trailers [_] + {:status 200 :trailers test-trailers :body "OK"}) + +(defn- get-async [_] + (let [body-ch (async/chan 1) + trailers-ch (async/promise-chan)] + (go + (dotimes [_ 10] + (! body-ch (.getBytes "OK"))) + (async/close! body-ch) + (! trailers-ch test-trailers)) + {:status 200 :trailers trailers-ch :body body-ch})) + +(defn- grpc-echo [{:keys [body] {:strs [grpc-encoding]} :headers :as request}] + {:status 200 + :headers {"grpc-encoding" grpc-encoding + "content-type" "application/grpc+proto"} + :body body + :trailers {"grpc-status" 0 "grpc-message" "Got it!"}}) + +(defn- grpc-missing-trailers [{:keys [body] :as request}] + {:status 200 :body "OK"}) + +(defn- grpc-failing-status [{:keys [body] :as request}] + {:status 200 :body "Permission Denied!" :trailers {"grpc-status" 7 "grpc-message" "Permission Denied"}}) + +(defn- grpc-invalid-status [{:keys [body] :as request}] + {:status 200 :body "I'm not valid!" :trailers {"grpc-status" "bad"}}) + +(defn- grpc-bad-encoding [{:keys [body] :as request}] + {:status 200 + :headers {"grpc-encoding" "bad-codec" + "content-type" "application/grpc+proto"} + :body (byte-array [1 0 0 0 4 0 0 0 1]) + :trailers {"grpc-status" 0 "grpc-message" "BAR"}}) + +(defn generic-mock-routes [interceptors] + [["/echo" :post (conj interceptors `echo)] + ["/trailers" :get (conj interceptors `get-trailers)] + ["/async" :get (conj interceptors `get-async)] + ["/protojure.http2-test/Echo" :post (conj interceptors `grpc-echo)] + ["/protojure.http2-test/MissingTrailers" :post (conj interceptors `grpc-missing-trailers)] + ["/protojure.http2-test/FailingStatus" :post (conj interceptors `grpc-failing-status)] + ["/protojure.http2-test/InvalidStatus" :post (conj interceptors `grpc-invalid-status)] + ["/protojure.http2-test/BadEncoding" :post (conj interceptors `grpc-bad-encoding)]]) + +;;----------------------------------------------------------------------------- +;; "Greeter" service endpoint +;;----------------------------------------------------------------------------- +(deftype Greeter [] + greeter/Service + (SayHello + [this {{:keys [name]} :grpc-params :as request}] + {:status 200 + :body {:message (str "Hello, " name)}}) + (SayRepeatHello + [this {{:keys [name]} :grpc-params :as request}] + (let [resp-chan (:grpc-out request)] + (go + (dotimes [_ 3] + (>! resp-chan {:message (str "Hello, " name)})) + (async/close! resp-chan)) + {:status 200 + :body resp-chan})) + (SayHelloOnDemand + [this {:keys [grpc-params] :as request}] + (let [out-chan (:grpc-out request)] + (go-loop [name (:name (! out-chan {:message (str "Hello, " name)}) + (recur (:name (tablesyntax {:rpc-metadata greeter/rpc-metadata + :interceptors interceptors + :callback-context (Greeter.)})) + +(defn routes [interceptors] + (concat + (generic-mock-routes interceptors) + (service-mock-routes interceptors))) + +;;----------------------------------------------------------------------------- +;; Utilities +;;----------------------------------------------------------------------------- +(defn service-url + [& rest] + (apply str "http://localhost:" (:port @test-env) rest)) + +(defn- run!-first [coll f] + (run! f coll)) + +(defn- grpc-connect + ([] (grpc-connect (:port @test-env))) + ([port] + @(grpc.http2/connect {:uri (str "http://localhost:" port) :content-coding "gzip"}))) + +;;----------------------------------------------------------------------------- +;; Scaletest Assemblies +;;----------------------------------------------------------------------------- +(defn- scaletest-xmit [{:keys [input]}] + (>!! input {:name "World"})) + +(defn- scaletest-recv [{:keys [output]}] + (let [result (HelloReply :ch output}}] + + {:input input + :output output + :client client + :request (grpc/invoke client desc)})) + +(defn streaming-scaletest + ([parallelism] (streaming-scaletest parallelism (partial identity (:grpc-client @test-env)))) + ([parallelism client-fn] + (doto (doall (repeatedly parallelism #(streaming-scaletest-invoke client-fn))) + (run!-first scaletest-xmit) + (run!-first scaletest-recv) + (run!-first scaletest-close) + (run!-first scaletest-wait)))) + +;;----------------------------------------------------------------------------- +;; Client Scaletest +;;----------------------------------------------------------------------------- +(defn client-scaletest [parallelism] + (doto (streaming-scaletest parallelism grpc-connect) + (run!-first scaletest-disconnect))) + +;;----------------------------------------------------------------------------- +;; Unary Scaletest +;;----------------------------------------------------------------------------- +(defn- unary-scaletest-invoke [] + (let [input (async/chan 1) + output (async/chan 1) + client (:grpc-client @test-env) + desc {:service "example.hello.Greeter" + :method "SayHello" + :input {:f new-HelloRequest :ch input} + :output {:f pb->HelloReply :ch output}}] + + (>!! input {:name "World"}) + (async/close! input) + + {:input input + :output output + :client client + :request (grpc/invoke client desc)})) + +(defn unary-scaletest [parallelism] + (doto (doall (repeatedly parallelism unary-scaletest-invoke)) + (run!-first scaletest-recv) + (run!-first scaletest-wait))) + +;;------------------------------------------------------------------------------------ +;; Synchronous send-request helpers +;;------------------------------------------------------------------------------------ +(defn- receive-metadata [ch] + (p/promise + (fn [resolve reject] + (go-loop [response {}] + (if-let [data (! ic body)) + (async/close! ic)) + + @(-> (p/all [(jetty-client/send-request context (assoc request :input-ch ic :meta-ch mc :output-ch oc)) + (receive-metadata mc) + (receive-body oc)]) + (p/then (fn [[_ response body]] + (assoc response :body body)))))) + +;;----------------------------------------------------------------------------- +;; Fixtures +;;----------------------------------------------------------------------------- +(defn create-service [] + (let [port (test.utils/get-free-port) + interceptors [(body-params/body-params) + pedestal/html-body] + server-params {:env :prod + ::pedestal/routes (into #{} (routes interceptors)) + ::pedestal/port port + + ::pedestal/type protojure.pedestal/config + ::pedestal/chain-provider protojure.pedestal/provider} + client-params {:port port}] + + (let [server (test.utils/start-pedestal-server server-params) + client @(jetty-client/connect client-params) + grpc-client (grpc-connect port)] + (swap! test-env assoc :port port :server server :client client :grpc-client grpc-client)))) + +(defn destroy-service [] + (swap! test-env update :grpc-client grpc/disconnect) + (swap! test-env update :client jetty-client/disconnect) + (swap! test-env update :server pedestal/stop)) + +(defn wrap-service [test-fn] + (create-service) + (test-fn) + (destroy-service)) + +(use-fixtures :once wrap-service) + +;;----------------------------------------------------------------------------- +;; Tests +;;----------------------------------------------------------------------------- +(deftest bad-address-check + (testing "Check that we behave rationally with a bad address" + (is (thrown? java.util.concurrent.ExecutionException + @(jetty-client/connect {:host "bad.example.com" :port 80}))))) + +(deftest echo-check + (testing "Check that basic connectivity works" + (let [client (:client @test-env) + input (.getBytes "ping") + result (send-request-sync client {:method "POST" :url (service-url "/echo") :body input}) + output (:body result)] + (is (-> result :status (= 200))) + (is (data-equal? input output))))) + +(deftest trailers-check + (testing "Check that trailers are delivered" + (let [client (:client @test-env) + result (send-request-sync client {:url (service-url "/trailers")}) + trailers (:trailers result)] + (is (-> result :status (= 200))) + (is (data-equal? trailers test-trailers))))) + +(deftest async-check + (testing "Check that async data is delivered" + (let [client (:client @test-env) + result (send-request-sync client {:url (service-url "/async")}) + trailers (:trailers result)] + (is (-> result :status (= 200))) + (is (-> result :body String. (= (string/join (repeat 10 "OK"))))) + (is (data-equal? trailers test-trailers))))) + +(deftest basic-grpc-check + (testing "Check that a round-trip GRPC request works" + (let [input-msg {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000} + input (async/chan 16) + output (async/chan 16) + client (:grpc-client @test-env)] + + (go + (>! input input-msg) + (async/close! input)) + + @(-> (grpc/invoke client {:service "protojure.http2-test" + :method "Echo" + :input {:f example/new-Money :ch input} + :output {:f example/pb->Money :ch output}}) + (p/then (fn [{:keys [status] :as result}] + (is (= status 0)) + (let [output-msgs (take-while some? (repeatedly #( output-msgs count (= 1))) + (is (data-equal? input-msg (first output-msgs)))))))))) + +(deftest unary-grpc-check + (testing "Check that a round-trip unary GRPC request works" + (let [input (async/chan 1) + output (async/chan 16) + client (:grpc-client @test-env) + desc {:service "example.hello.Greeter" + :method "SayHello" + :input {:f new-HelloRequest :ch input} + :output {:f pb->HelloReply :ch output}}] + + @(-> (client.utils/send-unary-params input {:name "World"}) + (p/then (fn [_] (client.utils/invoke-unary client desc output))) + (p/then (fn [{:keys [message] :as result}] + (is (= message "Hello, World")))))))) + +(deftest streaming-grpc-check + (testing "Check that a round-trip streaming GRPC request works" + (let [repetitions 50 + input (async/chan repetitions) + output (async/chan repetitions) + client (:grpc-client @test-env) + desc {:service "example.hello.Greeter" + :method "SayHelloOnDemand" + :input {:f new-HelloRequest :ch input} + :output {:f pb->HelloReply :ch output}}] + + (async/onto-chan input (repeat repetitions {:name "World"})) + + @(-> (grpc/invoke client desc) + (p/then (fn [{:keys [status]}] + (is (= status 0)) + (let [result (take-while some? (repeatedly #( result count (= repetitions))) + (is (every? (partial data-equal? {:message "Hello, World"}) result))))))))) + +(deftest grpc-async-check + (testing "Check that an async GRPC request works" + (streaming-scaletest 1))) + +(def parallelism 100) + +(deftest grpc-streaming-scale-check + (testing "Check that parallel streaming GRPC requests may scale" + (streaming-scaletest parallelism))) + +(deftest grpc-client-scale-check + (testing "Check that GRPC requests may scale when arriving from distinct clients" + (client-scaletest parallelism))) + +(deftest grpc-unary-scale-check + (testing "Check that parallel unary GRPC requests may scale" + (unary-scaletest parallelism))) + +(deftest bad-grpc-check + (testing "Check that a bogus GRPC request throws an exception" + (let [client (:grpc-client @test-env)] + (is (thrown? java.util.concurrent.ExecutionException + @(grpc/invoke client {:service "protojure.unknown-service" + :method "UnknownMethod"})))))) + +(deftest bad-grpc-trailers-check + (testing "Check that a bogus GRPC status response throws an exception" + (let [client (:grpc-client @test-env)] + (is (thrown? java.util.concurrent.ExecutionException + @(grpc/invoke client {:service "protojure.http2-test" + :method "MissingTrailers"})))))) + +(deftest grpc-failing-status-check + (testing "Check that a failing GRPC status response throws an exception" + (let [client (:grpc-client @test-env)] + (is (thrown? java.util.concurrent.ExecutionException + @(grpc/invoke client {:service "protojure.http2-test" + :method "FailingStatus"})))))) + +(deftest grpc-invalid-status-check + (testing "Check that an invalid GRPC status response throws an exception" + (let [client (:grpc-client @test-env)] + (is (thrown? java.util.concurrent.ExecutionException + @(grpc/invoke client {:service "protojure.http2-test" + :method "InvalidStatus"})))))) + +(deftest bad-grpc-encoding-check + (testing "Check that a GRPC status response with a bad encoding-type throws an exception" + (let [client (:grpc-client @test-env) + output (async/chan 16)] + (is (thrown? java.util.concurrent.ExecutionException + @(grpc/invoke client {:service "protojure.http2-test" + :method "BadEncoding" + :output {:f example/pb->Money :ch output}})))))) + +(deftest grpc-route-creation-test + (testing "Check that protoc generated fact is accurately converted to route(s)") + (let [routes (pedestal.routes/->tablesyntax {:rpc-metadata greeter/rpc-metadata})] + (clojure.pprint/pprint routes) + (is (= (map first routes) + (seq ["/example.hello.Greeter/SayHello" + "/example.hello.Greeter/SayRepeatHello" + "/example.hello.Greeter/SayHelloAfterDelay" + "/example.hello.Greeter/SayHelloOnDemand" + "/example.hello.Greeter/SayHelloError"]))))) \ No newline at end of file diff --git a/test/protojure/grpc_web_test.clj b/test/protojure/grpc_web_test.clj new file mode 100644 index 0000000..d0830d6 --- /dev/null +++ b/test/protojure/grpc_web_test.clj @@ -0,0 +1,90 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.grpc-web-test + (:require [clojure.test :refer :all] + [io.pedestal.test :refer [response-for]] + [protojure.pedestal.core :as protojure.pedestal] + [protojure.pedestal.interceptors.grpc-web :as grpc-web] + [io.pedestal.http :as pedestal] + [io.pedestal.http.body-params :as body-params] + [example.types :as example] + [protojure.protobuf :as pb] + [clojure.data.codec.base64 :as b64])) + +(defn grpc-echo [{:keys [body] :as request}] + {:status 200 + :body (example/pb->Money body) + :trailers {"grpc-status" 0 "grpc-message" "Got it!"}}) + +(def interceptors [(body-params/body-params) + grpc-web/proxy]) + +(def routes [["/" :get (conj interceptors `grpc-echo)]]) + +(def service (let [service-params {:env :prod + ::pedestal/routes (into #{} routes) + ::pedestal/type protojure.pedestal/config + ::pedestal/chain-provider protojure.pedestal/provider}] + (:io.pedestal.http/service-fn (io.pedestal.http/create-servlet service-params)))) + +(deftest grpc-web-text-check + (testing "Check that a round-trip GRPC request works" + (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] + (is + (= + (with-out-str (pr (example/pb->Money input-msg))) + (:body (response-for + service + :get "/" + :headers {"Content-Type" "application/grpc-web-text"} + :body (clojure.java.io/input-stream (b64/encode input-msg))))))))) + +(deftest grpc-web-check + (testing "Check that a round-trip GRPC request works" + (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] + (is + (= + (with-out-str (pr (example/pb->Money input-msg))) + (:body (response-for + service + :get "/" + :headers {"Content-Type" "application/grpc-web"} + :body (clojure.java.io/input-stream input-msg)))))))) + +(deftest grpc-web-proto-check + (testing "Check that a round-trip GRPC request works" + (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] + (is + (= + (with-out-str (pr (example/pb->Money input-msg))) + (:body (response-for + service + :get "/" + :headers {"Content-Type" "application/grpc-web+proto"} + :body (clojure.java.io/input-stream input-msg)))))))) + +(deftest grpc-web-text-proto-check + (testing "Check that a round-trip GRPC request works" + (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] + (is + (= + (with-out-str (pr (example/pb->Money input-msg))) + (:body (response-for + service + :get "/" + :headers {"Content-Type" "application/grpc-web-text+proto"} + :body (clojure.java.io/input-stream (b64/encode input-msg))))))))) + +(deftest grpc-web-no-header-match-check + (testing "Check that a round-trip GRPC request works" + (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] + (is + (= + (with-out-str (pr (example/pb->Money input-msg))) + (:body (response-for + service + :get "/" + :headers {"Content-Type" "application/grpc"} + :body (clojure.java.io/input-stream input-msg)))))))) diff --git a/test/protojure/iostream_test.clj b/test/protojure/iostream_test.clj new file mode 100644 index 0000000..225d73e --- /dev/null +++ b/test/protojure/iostream_test.clj @@ -0,0 +1,78 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.iostream-test + (:require [clojure.test :refer :all] + [clojure.core.async :as async] + [clojure.java.io :as io] + [clojure.string :as string] + [clojure.core.async :refer [ (.read stream) (= -1))) + (is (-> (.available stream) (= 0)))))) + +(deftest check-input-available + (testing "Verify that our input stream reports available bytes properly" + (let [input (async/chan 64) + stream (InputStream. {:ch input}) + count 20] + (run! (partial async/put! input) (repeat count 42)) + (is (-> (.available stream) (= count)))))) + +(deftest check-bufferless-available + (testing "Verify that a bufferless core.async channel returns '1' for (.available)" + (let [input (async/chan) + stream (InputStream. {:ch input})] + (is (-> (.available stream) (= 1)))))) + +(deftest check-timeout + (testing "Verify that our input stream's timeout mechanism works" + (let [input (async/chan 64) + stream (InputStream. {:ch input :tmo 100})] + (is (thrown? clojure.lang.ExceptionInfo (.read stream)))))) + +(deftest check-array-read + (testing "Verify that we can read an array in one call" + (let [ch (async/chan 64) + stream (InputStream. {:ch ch}) + len 20 + output (byte-array len)] + (run! (partial async/put! ch) (repeat len 42)) + (.read stream output) + (is (= (count output) len)) + (doseq [x output] + (is (= x 42)))))) + +(defn- take-available [ch] + (take-while some? (repeatedly #( result count (= repetitions))) + (doseq [v result] + (is (= phrase v))))))) \ No newline at end of file diff --git a/test/protojure/pedestal_test.clj b/test/protojure/pedestal_test.clj new file mode 100644 index 0000000..4391c59 --- /dev/null +++ b/test/protojure/pedestal_test.clj @@ -0,0 +1,137 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.pedestal-test + (:require [clojure.test :refer :all] + [clojure.core.async :as async :refer [go >! !!]] + [io.pedestal.http :as http] + [io.pedestal.http.body-params :as body-params] + [protojure.pedestal.core :as protojure.pedestal] + [protojure.test.utils :as test.utils] + [clj-http.client :as client] + [clojure.java.io :as io])) + +;;----------------------------------------------------------------------------- +;; Data +;;----------------------------------------------------------------------------- +(defonce test-svc (atom {})) + +(defn- get-healthz [_] + {:status 200 + :headers {"Content-Type" "application/text"} + :trailers {"Meta" "test"} + :body "OK"}) + +(defn- echo-params [{{:keys [content]} :params}] + {:status 200 :body content}) + +(defn- echo-body [{:keys [body]}] + {:status 200 :body body}) + +(defn- echo-async [{{:keys [content]} :params}] + (let [ch (async/chan 1)] + (go + (>! ch (byte-array (map byte content))) + (async/close! ch)) + {:status 200 :body ch})) + +(defn- testdata-download [_] + {:status 200 :body (io/as-file (io/resource "testdata.txt"))}) + +(defn routes [interceptors] + [["/healthz" :get (conj interceptors `get-healthz)] + ["/echo" :get (conj interceptors `echo-params)] + ["/echo" :post (conj interceptors `echo-body)] + ["/echo/async" :get (conj interceptors `echo-async)] + ["/testdata" :get (conj interceptors `testdata-download)]]) + +;;----------------------------------------------------------------------------- +;; Utilities +;;----------------------------------------------------------------------------- + +(defn service-url + [& rest] + (apply str "http://localhost:" (:port @test-svc) rest)) + +(defn service-url-ssl + [& rest] + (apply str "https://localhost:" (:ssl-port @test-svc) rest)) + +;;----------------------------------------------------------------------------- +;; Fixtures +;;----------------------------------------------------------------------------- +(defn create-service [] + (let [port (test.utils/get-free-port) + ssl-port (test.utils/get-free-port) + interceptors [(body-params/body-params) + http/html-body] + desc {:env :prod + ::http/routes (into #{} (routes interceptors)) + ::http/port port + + ::http/type protojure.pedestal/config + ::http/chain-provider protojure.pedestal/provider + + ::http/container-options {:ssl-port ssl-port + ; keystore may be either string denoting file path (relative or + ; absolute) or actual KeyStore instance + :keystore (io/resource "https/keystore.jks") + :key-password "password"}}] + + (let [server (http/create-server desc)] + (http/start server) + (swap! test-svc assoc :port port :ssl-port ssl-port :server server)))) + +(defn destroy-service [] + (swap! test-svc update :server http/stop)) + +(defn wrap-service [test-fn] + (create-service) + (test-fn) + (destroy-service)) + +(use-fixtures :once wrap-service) + +;;----------------------------------------------------------------------------- +;; Tests +;;----------------------------------------------------------------------------- +(deftest healthz-check + (testing "Check that basic connectivity works" + (is (-> (client/get (service-url "/healthz")) :body (= "OK"))))) + +(deftest ssl-check + (testing "Check that SSL works" + (is (-> (client/get (service-url-ssl "/healthz") {:insecure? true}) :body (= "OK"))))) + +(deftest query-param-check + (testing "Check that query-parameters work" + (is (-> (client/get (service-url "/echo") {:query-params {"content" "FOO"}}) :body (= "FOO"))))) + +(deftest body-check + (testing "Check that response/request body work" + (is (-> (client/post (service-url "/echo") {:body "BAR"}) :body (= "BAR"))))) + +(deftest async-check + (testing "Check that async-body works" + (is (-> (client/get (service-url "/echo/async") {:query-params {"content" "FOO"}}) :body (= "FOO"))))) + +(deftest file-download-check + (testing "Check that we can download a file" + (is (->> (client/get (service-url "/testdata")) :body (re-find #"testdata!") some?)))) + +(deftest notfound-check + (testing "Check that a request for an invalid resource correctly propagates the error code" + (is (thrown? clojure.lang.ExceptionInfo (client/get (service-url "/invalid")))))) + +(deftest read-check + (testing "Check that bytes entered to channel are properly read from InputStream" + (let [test-string "Hello" + test-channel (async/chan 8096) + in-stream (protojure.pedestal.io.InputStream. test-channel) + buff (byte-array 5)] + (doseq [b (.getBytes test-string)] + (>!! test-channel b)) + (async/close! test-channel) + (.read in-stream buff 0 5) + (is (= "Hello" (String. buff)))))) \ No newline at end of file diff --git a/test/protojure/protobuf_test.clj b/test/protojure/protobuf_test.clj new file mode 100644 index 0000000..61fdf7f --- /dev/null +++ b/test/protojure/protobuf_test.clj @@ -0,0 +1,303 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf-test + (:require [clojure.test :refer :all] + [clojure.core.async :refer [!! ! go] :as async] + [protojure.protobuf.serdes :as serdes] + [protojure.protobuf :refer [->pb]] + [protojure.grpc.codec.lpm :as lpm] + [protojure.grpc.codec.compression :as compression] + [protojure.test.utils :refer [data-equal?]] + [promesa.core :as p] + [example.types :as example]) + (:import (com.google.protobuf CodedOutputStream + CodedInputStream) + (java.io ByteArrayInputStream) + (org.apache.commons.io.input CloseShieldInputStream) + (org.apache.commons.io.output CloseShieldOutputStream)) + (:refer-clojure :exclude [resolve])) + +;;----------------------------------------------------------------------------- +;; Helper functions +;;----------------------------------------------------------------------------- + +(defn- fns [type] + (mapv #(clojure.core/resolve (symbol "protojure.protobuf.serdes" (str % type))) + ["size-" "write-" "cis->"])) + +(defn- resolve-fns [type] + (let [[sizefn writefn parsefn] (fns type)] + {:sizefn sizefn :writefn writefn :parsefn parsefn})) + +(defn- pbverify + "End to end serdes testing for a specific message" + [newf pb->f data] + (let [input (newf data) + output (-> input ->pb pb->f)] + (is (data-equal? input output)))) + +(defn- with-buffer + "Invokes 'f' with a fully formed buffered output-stream and returns the bytes" + [len f] + (let [buf (byte-array len) + os (CodedOutputStream/newInstance buf)] + (f os) + (.flush os) + buf)) + +(defn- write [sizefn writefn tag value] + (let [len (sizefn tag value)] + (with-buffer len (partial writefn tag value)))) + +(defn- write-embedded [tag item] + (write serdes/size-embedded serdes/write-embedded tag item)) + +(defn- write-repeated [sizefn writefn tag items] + (let [len (serdes/size-repeated sizefn tag items)] + (with-buffer len (partial serdes/write-repeated writefn tag items)))) + +(defn- parse [^bytes buf readfn] + (let [is (CodedInputStream/newInstance buf)] + (.readTag is) + (readfn is))) + +(defn- parse-repeated [^bytes buf readfn packable? tag] + (let [is (CodedInputStream/newInstance buf) + f (if packable? (partial serdes/cis->packablerepeated tag) serdes/cis->repeated)] + (serdes/tag-map + (fn [tag index] + [index (f readfn is)]) + is))) + +(defn- byte-string [x] (byte-array (mapv byte x))) + +(defn- test-repeated [data] + (-> (mapv byte data) + (byte-array) + (example/pb->SimpleRepeated) + (:data))) + +(defn- repeat-num + "Create an range of 'n' contiguous values from 'input'" + [n input] + (take n (iterate inc input))) + +;;----------------------------------------------------------------------------- +;; Test data +;;----------------------------------------------------------------------------- + +(def tag 0x80) ;; random tag to use + +(def test-msg {:currency_code "USD" :units 42 :nanos 750000000}) +(def long-test-msg (update test-msg :currency_code (fn [x] (apply str (repeat 20 x))))) + +(def int-scalars + ["Enum" + "Fixed32" + "Fixed64" + "Int32" + "Int64" + "SFixed32" + "SFixed64" + "SInt32" + "SInt64" + "UInt32" + "UInt64"]) + +(def float-scalars + ["Float" + "Double"]) + +(def _test-data + ;;------------------------------------------------------------------------------------ + ;; types input default packable? repeatfn + ;;------------------------------------------------------------------------------------ + [[int-scalars 42 0 true repeat-num] + [float-scalars 42.0 0.0 true repeat-num] + [["Bool"] true false true repeat] + [["String"] "hello" "" false repeat] + [["Bytes"] (byte-string "hello") (byte-array 0) false repeat]]) + +(def test-data + (flatten + (for [[types input default packable? repeatfn] _test-data] + (map (fn [type] {:type type :input input :default default :packable? packable? :repeatfn repeatfn}) types)))) + +;;----------------------------------------------------------------------------- +;; Validation helpers +;;----------------------------------------------------------------------------- + +(defn- validate-e2e + "validate that we can do a complete end-to-end serialize->deserialize cycle" + [{:keys [type input]}] + (let [{:keys [sizefn writefn parsefn]} (resolve-fns type) + output (-> (write sizefn writefn tag input) + (parse parsefn))] + + (is (data-equal? input output)))) + +(defn- validate-optimizer + "validate optimizer behavior. 'input' items should _never_ be elided, thus + they should always generate a positive length calculation. 'default' fields + however, are fields that are carrying default values. The optimizer should + detect this and elide them from the wire, resulting in a 0 length calc. For + good measure, we also fire up a (writefn) operation to a nil output stream. + A correct functioning optimizer will elide the write, resulting in no errors + even despite our bogus stream." + [{:keys [type input default]}] + (let [{:keys [writefn sizefn]} (resolve-fns type)] + (is (pos? (sizefn tag input))) + (is (zero? (sizefn tag default))) + (is (pos? (sizefn tag {:optimize false} default))) + (writefn tag default nil))) + +(defn- validate-repeated + [{:keys [type input packable? repeatfn]}] + (let [{:keys [sizefn writefn parsefn]} (resolve-fns type) + items (vec (repeatfn 10 input)) + output (-> (write-repeated sizefn writefn tag items) + (parse-repeated parsefn packable? tag) + (get tag))] + + (is (data-equal? items output)) + (is (zero? (serdes/size-repeated sizefn tag []))))) + +;; We add a silly codec named "mycustom" that does nothing. We use the CloseShieldXXX family +;; of proxy stream classes so that we pass the IO through, but bury the (.close) operation. This +;; codec is only useful for validating that a custom-codec actually works. +(def custom-codecs + (assoc compression/builtin-codecs + "mycustom" {:input #(CloseShieldInputStream. %) :output #(CloseShieldOutputStream. %)})) + +(defn- validate-lpm-msg + [input-ch output-ch input] + (>!! input-ch input) + (let [output ( {:codecs custom-codecs} + (cond-> (some? content-coding) (assoc :content-coding content-coding))) + tasks (p/all [(lpm/encode example/new-Money input wire options) + (lpm/decode example/pb->Money wire output options)])] + + (run! (partial validate-lpm-msg input output) (repeat 10 msg)) + (async/close! input) + @tasks)) + +(defn- validate-bad-codec + [msg content-coding] + (is (thrown? clojure.lang.ExceptionInfo (validate-lpm msg content-coding)))) + +;;----------------------------------------------------------------------------- +;; Tests +;;----------------------------------------------------------------------------- + +(deftest raw-e2e-test + (testing "Test each type round trip serialize->deserialize" + (run! validate-e2e test-data))) + +(deftest optimizer-test + (testing "Check that the optimizer skips default values" + (run! validate-optimizer test-data))) + +(deftest pb-e2e-test + (testing "End-to-end testing by processing arbitrary PB type" + (pbverify example/new-Money + example/pb->Money + test-msg))) + +(deftest repeated-test + (testing "Check that repeated values are handled properly" + (run! validate-repeated test-data))) + +;; Represent a 'repeated int32' wire representation in both +;; packed and unpacked format. For more details, see: +;; https://developers.google.com/protocol-buffers/docs/encoding#packed +(deftest packed-repeated-test + (testing "Testing repeated field decoding of packed structures" + (let [result (test-repeated [0xA 3 21 22 23 0xA 3 24 25 26])] ;; send the data in two chunks + (is (= (count result) 6)) + (is (data-equal? result [21 22 23 24 25 26]))))) + +(deftest unpacked-repeated-test + (testing "Testing repeated field decoding of unpacked structures" + (let [result (test-repeated [0x8 1 0x8 2 0x8 3])] + (is (= (count result) 3)) + (is (data-equal? result [1 2 3]))))) + +(deftest map-test + (testing "Test map serialization" + (pbverify example/new-AllThingsMap + example/pb->AllThingsMap + {:s "hello" + :i 42 + :mSimple {"k1" 42 "k2" 43} + :mComplex {"k1" {:s "v1"} "k2" {:s "v2"}} + :sSimple {:s "simple"}}))) + +(deftest embedded-test + (testing "Verify that we can embed a message in a coded stream" + (let [input (example/new-Money test-msg) + output (-> (write-embedded tag input) + (parse example/ecis->Money))] + (is (data-equal? input output))))) + +(deftest embedded-nil-test + (testing "Check that embedded but unset messages are handled properly" + (is (zero? (serdes/size-embedded tag nil))) + (serdes/write-embedded tag nil nil))) + +(deftest grpc-lpm-test + (testing "Verify that we can round-trip through the LPM logic with each compression type" + (let [codecs [nil "identity" "gzip" "deflate" "snappy" "mycustom"]] + (run! (partial validate-lpm long-test-msg) codecs) + (run! (partial validate-lpm test-msg) codecs)))) + +(deftest grpc-timeout-test + (testing "Verify that we correctly timeout on a stalled decode" + (let [input (async/chan 16384) + output (async/chan 64)] + (run! (fn [x] (async/put! input (byte x))) [0 0 0 0 4]) + (is (thrown? java.util.concurrent.ExecutionException @(lpm/decode example/pb->Money input output {:tmo 100})))))) + +(deftest grpc-bad-codec + (testing "Verify that we reject invalid codec types" + (run! (partial validate-bad-codec test-msg) ["bad-codec" 42 {}]))) + +(deftest grpc-bad-decode + (testing "Verify that we error decoding an invalid message" + (let [input (async/chan 16384) + output (async/chan 64) + pipeline (lpm/decode example/pb->Money input output {})] + + (go + (doseq [b (repeatedly 256000 #(unchecked-byte (rand-int 256)))] + (>! input b)) + (async/close! input)) + + (loop [] + (if-let [_ (!! input {:nanos "bad data"}) + (async/close! input)) + + (loop [] + (if-let [_ (