diff --git a/lavinmq/.gitignore b/lavinmq/.gitignore new file mode 100644 index 000000000..a40694355 --- /dev/null +++ b/lavinmq/.gitignore @@ -0,0 +1,15 @@ +/target +/classes +/checkouts +profiles.clj +pom.xml +pom.xml.asc +*.jar +*.class +/.lein-* +/.nrepl-port +/.prepl-port +.hgignore +.hg/ +.clj-kondo +.lsp diff --git a/lavinmq/CHANGELOG.md b/lavinmq/CHANGELOG.md new file mode 100644 index 000000000..d93b5c930 --- /dev/null +++ b/lavinmq/CHANGELOG.md @@ -0,0 +1,24 @@ +# Change Log +All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/). + +## [Unreleased] +### Changed +- Add a new arity to `make-widget-async` to provide a different widget shape. + +## [0.1.1] - 2025-01-23 +### Changed +- Documentation on how to make the widgets. + +### Removed +- `make-widget-sync` - we're all async, all the time. + +### Fixed +- Fixed widget maker to keep working when daylight savings switches over. + +## 0.1.0 - 2025-01-23 +### Added +- Files from the new template. +- Widget maker public API - `make-widget-sync`. + +[Unreleased]: https://sourcehost.site/your-name/jepsen.lavinmq/compare/0.1.1...HEAD +[0.1.1]: https://sourcehost.site/your-name/jepsen.lavinmq/compare/0.1.0...0.1.1 diff --git a/lavinmq/LICENSE b/lavinmq/LICENSE new file mode 100644 index 000000000..231512650 --- /dev/null +++ b/lavinmq/LICENSE @@ -0,0 +1,280 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public +License as published by the Free Software Foundation, either version 2 +of the License, or (at your option) any later version, with the GNU +Classpath Exception which is available at +https://www.gnu.org/software/classpath/license.html." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/lavinmq/README.md b/lavinmq/README.md new file mode 100644 index 000000000..e73c2a644 --- /dev/null +++ b/lavinmq/README.md @@ -0,0 +1,22 @@ +# jepsen.lavinmq + +A Clojure library designed to ... well, that part is up to you. + +## Usage + +FIXME + +## License + +Copyright © 2025 FIXME + +This program and the accompanying materials are made available under the +terms of the Eclipse Public License 2.0 which is available at +http://www.eclipse.org/legal/epl-2.0. + +This Source Code may also be made available under the following Secondary +Licenses when the conditions for such availability set forth in the Eclipse +Public License, v. 2.0 are satisfied: GNU General Public License as published by +the Free Software Foundation, either version 2 of the License, or (at your +option) any later version, with the GNU Classpath Exception which is available +at https://www.gnu.org/software/classpath/license.html. diff --git a/lavinmq/doc/intro.md b/lavinmq/doc/intro.md new file mode 100644 index 000000000..4efd0fef4 --- /dev/null +++ b/lavinmq/doc/intro.md @@ -0,0 +1,3 @@ +# Introduction to jepsen.lavinmq + +TODO: write [great documentation](http://jacobian.org/writing/what-to-write/) diff --git a/lavinmq/project.clj b/lavinmq/project.clj new file mode 100644 index 000000000..865314457 --- /dev/null +++ b/lavinmq/project.clj @@ -0,0 +1,18 @@ +(defproject jepsen.lavinmq "0.1.0-SNAPSHOT" + :description "FIXME: write description" + :url "http://example.com/FIXME" + :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" + :url "https://www.eclipse.org/legal/epl-2.0/"} + :main jepsen.lavinmq + + :source-paths ["src/main/clojure"] + :java-source-paths ["src/main/java"] + :jvm-opts ["-Dcom.sun.management.jmxremote"] + + :dependencies [[org.clojure/clojure "1.12.0"] + [jepsen "0.3.7"] + [com.rabbitmq/amqp-client "5.24.0" + :exclusions [org.slf4j/slf4j-api]] + ] + :exclusions [org.slf4j/log4j-over-slf4j log4j/log4j] + :repl-options {:init-ns jepsen.lavinmq}) diff --git a/lavinmq/src/main/clojure/jepsen/lavinmq.clj b/lavinmq/src/main/clojure/jepsen/lavinmq.clj new file mode 100644 index 000000000..fa3f3d693 --- /dev/null +++ b/lavinmq/src/main/clojure/jepsen/lavinmq.clj @@ -0,0 +1,231 @@ +(ns jepsen.lavinmq + (:require [clojure.tools.logging :refer [debug info warn]] + [clojure.string :as str] + [clojure.java.io :as io] + [jepsen [checker :as checker] + [cli :as cli] + [client :as client] + [control :as c] + [db :as db] + [generator :as gen] + [nemesis :as nemesis] + [tests :as tests] + [util :as util :refer [timeout]] + [core :as core] + [util :refer [meh timeout log-op]] + [codec :as codec]] + [jepsen.checker.timeline :as timeline] + [jepsen.control.util :as cu] + [jepsen.os.ubuntu :as os] + [knossos.model :as model] + [knossos.core :as knossos] + [knossos.op :as op]) + (:import (com.rabbitmq.client AlreadyClosedException + ShutdownSignalException))) + + +(defn db + "LavinMQ broker." + [] + (reify db/DB + (setup! [_ test node] + (info node "installing")) + + (teardown! [_ test node] + (info node "tearing down")) + + ; db/LogFiles + ; (log-files [_ test node] + ; ["/var/log/syslog"]) + )) + +(def queue "jepsen.queue") + +(defn dequeue! + "Given a client and an operation, dequeues a value and returns the + corresponding operation." + [conn op] + (try + (let [result (com.amqp.jepsen.Utils/dequeue conn)] + (if (= nil result) + (assoc op :type :fail :value :exhausted) + (assoc op :type :ok :value result)) + ) + (catch java.util.concurrent.TimeoutException _ + (info "dequeue timed out") + (assoc op :type :fail :value :timeout)) + ) +) + +(defmacro with-ch + "Opens a channel on 'conn for body, binds it to the provided symbol 'ch, and + ensures the channel is closed after body returns." + [[ch conn] & body] + `(let [~ch (lch/open ~conn)] + (try ~@body + (finally + (try (rmq/close ~ch) + (catch AlreadyClosedException _# (info "channel was already closed")) + ))))) + +(defrecord QueueClient + [conn publish-confirm-timeout auth] + client/Client + (open! [client test node] + (info "open! called for " node auth) + (assoc client :conn (com.amqp.jepsen.Utils/createClient test node auth) + :publish-confirm-timeout (test :publish-confirm-timeout) + :auth auth) + ) + (setup! [client test] + (com.amqp.jepsen.Utils/setup conn) + client) + + (teardown! [_ test]) + ; there is nothing to tear down + (close! [_ test] + (meh (com.amqp.jepsen.Utils/close conn)) + ) + + (invoke! [client test op] + + (try + (case (:f op) + :enqueue (do + (try + (if (com.amqp.jepsen.Utils/enqueue conn (:value op) publish-confirm-timeout) + (assoc op :type :ok) + (assoc op :type :fail)) + (catch java.util.concurrent.TimeoutException _ (assoc op :type :info :error :timeout))) + ) + + :dequeue (dequeue! conn op) + + :drain (assoc op :type :ok, :value (com.amqp.jepsen.Utils/drain conn)) + ) + (catch java.util.concurrent.TimeoutException _ + (info "channel operation timed out") + (assoc op :type :info :error :timeout)) + (catch java.lang.Exception ex + (info "unexpected client exception" (.getMessage ex) "reconnecting") + (com.amqp.jepsen.Utils/reconnect conn) + (assoc op :type :fail :error :exception)) + ) + )) + +; (defn queue-client [] (QueueClient. nil nil)) + +(def network-partition-nemeses + "A map of network partition nemesis names" + {"random-partition-halves" "" + "partition-halves" "" + "partition-majorities-ring" "" + "partition-random-node" "" + }) + +(def consumer-types + "A map of consumer types" + {"asynchronous" "" + "polling" "" + "mixed" "" + }) + +(defn init-nemesis + "Returns appropriate nemesis" + [opts] + (case (:network-partition opts) + "random-partition-halves" (nemesis/partition-random-halves) + "partition-halves" (nemesis/partition-halves) + "partition-majorities-ring" (nemesis/partition-majorities-ring) + "partition-random-node" (nemesis/partition-random-node) + ) + ) + +(def enqueue-value (atom -1)) +(defn enqueue [_ _] + {:type :invoke, :f :enqueue, :value (swap! enqueue-value inc)}) +(defn dequeue [_ _] {:type :invoke, :f :dequeue}) + +(defn lavinmq-test + "Given an options map from the command-line runner (e.g. :nodes, :ssh, + :concurrency, ...), constructs a test map." + [opts] + (info :opts opts) + (let [nemesis (init-nemesis opts)] + (merge tests/noop-test + {:pure-generators true + :name "lavinmq-simple-partition" + :os os/os + :db (db) + :client (QueueClient. nil nil {:username (:amqp-username opts) :password (:amqp-password opts) :vhost (:amqp-vhost opts)}) + :nemesis nemesis + :checker (checker/compose + {:perf (checker/perf) + :queue (checker/total-queue) + }) + :generator (gen/phases + (->> (gen/mix [enqueue dequeue]) + ; FIXME could gen/stagger introduce good randomness? + (gen/delay (/ (:rate opts))) + (gen/nemesis + (cycle [(gen/sleep (:time-before-partition opts)) + {:type :info :f :start} + (gen/sleep (:partition-duration opts)) + {:type :info :f :stop}])) + (gen/time-limit (:time-limit opts))) + (gen/nemesis + (gen/once {:type :info, :f :stop})) + (gen/log "waiting for recovery") + (gen/sleep 20) + (gen/clients + (gen/each-thread + (gen/once {:type :invoke + :f :drain}))))} + opts) + )) + +(def cli-opts + "Additional command line options." + [ + ["-r" "--rate HZ" "Approximate number of enqueue/dequeue per second, per thread." + :default 50 + :parse-fn read-string + :validate [#(and (number? %) (pos? %)) "Must be a positive number"]] + [nil "--partition-duration NUM" "Duration of partition (in seconds)" + :default 10 + :parse-fn parse-long + :validate [pos? "Must be a positive integer."]] + [nil "--time-before-partition NUM" "Time before the partition starts (in seconds)" + :default 10 + :parse-fn parse-long + :validate [pos? "Must be a positive integer."]] + [nil "--network-partition NAME" "Which network partition strategy to use. Default is random-partition-halves" + :default "random-partition-halves" + :missing (str "--network-partition " (cli/one-of network-partition-nemeses)) + :validate [network-partition-nemeses (cli/one-of network-partition-nemeses)]] + [nil "--publish-confirm-timeout NUM" "Timeout for publish confirms (in milliseconds)" + :default 5000 + :parse-fn parse-long + :validate [pos? "Must be a positive integer."]] + [nil "--consumer-type TYPE" "Type of the consumers to dequeue and drain. Default is asynchronous" + :default "asynchronous" + :missing (str "--consumer-type " (cli/one-of consumer-types)) + :validate [consumer-types (cli/one-of consumer-types)]] + [nil "--dead-letter FLAG" "Use dead letter queue and TTL on messages" + :default false] + [nil "--amqp-username NAME" "username to connect to broker" + :default "guest"] + [nil "--amqp-password PWD" "password to connect to broker" + :default "guest"] + [nil "--amqp-vhost VHOST" "vhost to use" + :default "/"] + ]) + +(defn -main + "Handles command line arguments. Can either run a test, or a web server for + browsing results." + [& args] + (cli/run! (merge (cli/single-test-cmd {:test-fn lavinmq-test, :opt-spec cli-opts}) + (cli/serve-cmd)) + args)) + diff --git a/lavinmq/src/main/java/com/amqp/jepsen/Utils.java b/lavinmq/src/main/java/com/amqp/jepsen/Utils.java new file mode 100644 index 000000000..7a4b211b2 --- /dev/null +++ b/lavinmq/src/main/java/com/amqp/jepsen/Utils.java @@ -0,0 +1,662 @@ +/* + * Copyright (c) 2019-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amqp.jepsen; + +import clojure.java.api.Clojure; +import clojure.lang.IPersistentVector; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.GetResponse; +import java.net.SocketException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utils { + + static final Duration MESSAGE_TTL = Duration.ofSeconds(1); + private static final String QUEUE = "jepsen.queue"; + private static final String DEAD_LETTER_QUEUE = "jepsen.queue.dead.letter"; + private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(); + static Logger LOGGER = LoggerFactory.getLogger("jepsen.client.utils"); + + public static Client createClient(Map test, Object node, Map auth) throws Exception { + Object consumerTypeParameter = get(test, ":consumer-type"); + String consumerType; + if (consumerTypeParameter == null) { + consumerType = "asynchronous"; + } else { + consumerType = consumerTypeParameter.toString(); + } + + Object deadLetterParameter = get(test, ":dead-letter"); + boolean deadLetter; + if (deadLetterParameter == null) { + deadLetter = false; + } else { + deadLetter = Boolean.valueOf(deadLetterParameter.toString()); + } + + Object usernameParameter = get(auth, ":username"); + String username; + if (usernameParameter == null) { + username = "guest"; + } else { + username = usernameParameter.toString(); + } + Object passwordParameter = get(auth, ":password"); + String password; + if (passwordParameter == null) { + password = "guest"; + } else { + password = passwordParameter.toString(); + } + Object vhostParameter = get(auth, ":vhost"); + String vhost; + if (vhostParameter == null) { + vhost = "/"; + } else { + vhost = vhostParameter.toString(); + } + int nodeCount = nodeCount(test); + int quorumInitialGroupSize = nodeCount; + Client client; + if ("asynchronous".equals(consumerType)) { + client = new AsynchronousConsumerClient(node.toString(), username, password, vhost, deadLetter, quorumInitialGroupSize); + } else if ("polling".equals(consumerType)) { + client = new BasicGetClient(node.toString(), username, password, vhost, deadLetter, quorumInitialGroupSize); + } else if ("mixed".equals(consumerType)) { + Random random = new Random(); + if (random.nextBoolean()) { + client = new AsynchronousConsumerClient(node.toString(), username, password, vhost, deadLetter, quorumInitialGroupSize); + } else { + client = new BasicGetClient(node.toString(), username, password, vhost, deadLetter, quorumInitialGroupSize); + } + } else { + throw new IllegalArgumentException("Unknown consumer type: " + consumerType); + } + client = new LoggingClient(client); + return client; + } + + static Object get(Map map, String keyStringValue) { + for (Map.Entry entry : map.entrySet()) { + if (keyStringValue.equals(entry.getKey().toString())) { + return entry.getValue(); + } + } + return null; + } + + static int nodeCount(Map test) { + return ((Collection) get(test, ":nodes")).size(); + } + + public static void setup(Client client) throws Exception { + client.setup(); + } + + public static void close(Client client) throws Exception { + client.close(); + } + + public static boolean enqueue(Client client, Object value, Number publishConfirmTimeout) + throws Exception { + return client.enqueue(value, publishConfirmTimeout); + } + + public static Number dequeue(Client client) throws Exception { + return client.dequeue(); + } + + public static IPersistentVector drain(Client client) throws Exception { + return client.drain(); + } + + public static void reconnect(Client client) throws Exception { + client.reconnect(); + } + + private static IPersistentVector toClojureVector(Collection values) { + StringBuilder builder = new StringBuilder("["); + values.forEach(v -> builder.append(v).append(" ")); + builder.append("]"); + return (IPersistentVector) Clojure.read(builder.toString()); + } + + static void reset() { + AbstractClient.IDS.set(0); + AbstractClient.QUEUES_DECLARED.set(false); + AbstractClient.CLIENTS.clear(); + AbstractClient.DRAINED.set(false); + } + + public interface Client { + + void setup() throws Exception; + + boolean enqueue(Object value, Number publishConfirmTimeout) throws Exception; + + Integer dequeue() throws Exception; + + IPersistentVector drain() throws Exception; + + void close() throws Exception; + + void reconnect() throws Exception; + } + + private interface CallableConsumer { + + void accept(T t) throws Exception; + } + + static class LoggingClient implements Client { + + private final Client delegate; + + LoggingClient(Client delegate) { + this.delegate = delegate; + } + + @Override + public void setup() throws Exception { + try { + delegate.setup(); + } catch (Exception e) { + log("setup", e); + throw e; + } + } + + @Override + public boolean enqueue(Object value, Number publishConfirmTimeout) throws Exception { + try { + return delegate.enqueue(value, publishConfirmTimeout); + } catch (Exception e) { + log("enqueue", e); + throw e; + } + } + + @Override + public Integer dequeue() throws Exception { + try { + return delegate.dequeue(); + } catch (Exception e) { + log("dequeue", e); + throw e; + } + } + + @Override + public IPersistentVector drain() throws Exception { + try { + return delegate.drain(); + } catch (Exception e) { + log("drain", e); + throw e; + } + } + + @Override + public void close() throws Exception { + try { + delegate.close(); + } catch (Exception e) { + log("close", e); + throw e; + } + } + + @Override + public void reconnect() throws Exception { + try { + delegate.reconnect(); + } catch (Exception e) { + log("reconnect", e); + throw e; + } + } + + private void log(String method, Exception exception) { + LOGGER.info( + delegate + + ", method " + + method + + " has failed: " + + exception.getClass().getSimpleName() + + " " + + exception.getMessage()); + } + } + + private abstract static class AbstractClient implements Client { + + protected static final Collection CLIENTS = new CopyOnWriteArrayList<>(); + protected static final Set HOSTS = ConcurrentHashMap.newKeySet(); + protected static final AtomicBoolean DRAINED = new AtomicBoolean(false); + static final AtomicInteger IDS = new AtomicInteger(0); + static final AtomicBoolean QUEUES_DECLARED = new AtomicBoolean(false); + static final Lock QUEUE_DECLARATION_LOCK = new ReentrantLock(); + protected final Integer id; + protected final String host; + protected final String username; + protected final String password; + protected final String vhost; + final boolean deadLetterMode; + private final int quorumInitialGroupSize; + final String inboundQueue, outboundQueue; + private final AtomicBoolean initialized = new AtomicBoolean(false); + protected volatile Connection connection; + protected AtomicBoolean closed = new AtomicBoolean(false); + + protected volatile Channel publishingChannel, consumingChannel; + + protected AbstractClient(String host, String username, String password, String vhost, boolean deadLetterMode, int quorumInitialGroupSize) throws Exception { + this.host = host; + this.username = username; + this.password = password; + this.vhost = vhost; + this.quorumInitialGroupSize = quorumInitialGroupSize; + HOSTS.add(host); + this.connection = createConnection(); + this.deadLetterMode = deadLetterMode; + if (this.deadLetterMode) { + this.inboundQueue = QUEUE; + this.outboundQueue = DEAD_LETTER_QUEUE; + } else { + this.inboundQueue = QUEUE; + this.outboundQueue = QUEUE; + } + id = IDS.incrementAndGet(); + } + + private static Connection createConnection(String host, String username, String password, String vhost, Duration timeout) throws Exception { + ConnectionFactory cf = new ConnectionFactory(); + cf.setAutomaticRecoveryEnabled(false); + cf.setHost(host); + cf.setUsername(username); + cf.setPassword(password); + cf.setVirtualHost(vhost); + + long elapsed = 0; + long timeoutMs = timeout.toMillis(); + while (elapsed < timeoutMs) { + try { + return cf.newConnection(); + } catch (SocketException e) { + waitMs(1000); + elapsed += 1000; + } + } + return cf.newConnection(); + } + + private static void waitMs(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + protected Connection createConnection() throws Exception { + log("createConnection host=" + this.host + ", username=" + this.username + ", vhost=" + this.vhost); + return createConnection(this.host, this.username, this.password, this.vhost, Duration.ofSeconds(30)); + } + protected Connection createConnection(String host) throws Exception { + return createConnection(host, this.username, this.password, this.vhost, Duration.ofSeconds(30)); + } + + public void initializeIfNecessary() throws Exception { + if (initialized.compareAndSet(false, true)) { + initialize(); + } + } + + protected abstract void initialize() throws Exception; + + @Override + public void setup() throws Exception { + try { + QUEUE_DECLARATION_LOCK.lock(); + if (QUEUES_DECLARED.compareAndSet(false, true)) { + try (Channel ch = connection.createChannel()) { + ch.queueDelete(inboundQueue); + } catch (Exception e) { + // OK + } + if (this.deadLetterMode) { + try (Channel ch = connection.createChannel()) { + ch.queueDelete(outboundQueue); + } catch (Exception e) { + // OK + } + } + log("Declaring " + inboundQueue); + try (Channel ch = connection.createChannel()) { + Map queueArguments = new HashMap<>(); + queueArguments.put("x-queue-type", "quorum"); + queueArguments.put("x-quorum-initial-group-size", this.quorumInitialGroupSize); + if (this.deadLetterMode) { + queueArguments.put("x-dead-letter-exchange", ""); + queueArguments.put("x-dead-letter-routing-key", this.outboundQueue); + queueArguments.put("x-dead-letter-strategy", "at-least-once"); + queueArguments.put("x-overflow", "reject-publish"); + queueArguments.put("x-message-ttl", MESSAGE_TTL.toMillis()); + } + ch.queueDeclare(inboundQueue, true, false, false, queueArguments); + Thread.sleep(1000); + ch.queuePurge(inboundQueue); + + if (this.deadLetterMode) { + log("Declaring " + outboundQueue); + queueArguments = new HashMap<>(); + queueArguments.put("x-queue-type", "quorum"); + queueArguments.put("x-quorum-initial-group-size", this.quorumInitialGroupSize); + ch.queueDeclare(outboundQueue, true, false, false, queueArguments); + Thread.sleep(1000); + ch.queuePurge(outboundQueue); + } + } + } + } finally { + QUEUE_DECLARATION_LOCK.unlock(); + } + } + + public boolean enqueue(Object value, Number publishConfirmTimeout) throws Exception { + initializeIfNecessary(); + publishingChannel.basicPublish( + "", + inboundQueue, + true, + new AMQP.BasicProperties.Builder().deliveryMode(2).build(), + value.toString().getBytes()); + return publishingChannel.waitForConfirms(publishConfirmTimeout.intValue()); + } + + protected Integer asyncDequeue(AtomicBoolean timedOut, Callable dequeueAction) + throws Exception { + Future task = EXECUTOR_SERVICE.submit(dequeueAction); + try { + return task.get(5, TimeUnit.SECONDS); + } catch (TimeoutException te) { + timedOut.set(true); + try { + task.cancel(true); + } catch (Exception e) { + log("Exception while cancelling task " + e.getMessage()); + } + throw te; + } + } + + public void close() throws Exception { + if (closed.compareAndSet(false, true)) { + connection.close(5000); + } + } + + protected void log(String message) { + LOGGER.info("Client " + host + ": " + message); + } + + public IPersistentVector drain( + AtomicBoolean drainedAlready, Collection clients, Set hosts) + throws Exception { + if (drainedAlready.compareAndSet(false, true)) { + log("Draining sequence..."); + log("Closing existing client connections..."); + for (Client client : clients) { + try { + client.close(); + } catch (Exception e) { + } + } + Thread.sleep(5000L); + Collection values = new ArrayList<>(); + + Connection c = null; + for (String h : hosts) { + log("Trying to connect to node " + h + " to drain."); + try { + c = createConnection(h); + log("Connected to " + h + " to drain."); + Channel ch = c.createChannel(); + + CallableConsumer drainAction = + queue -> { + log("Draining from " + queue); + GetResponse getResponse; + int count = 0; + while ((getResponse = ch.basicGet(queue, false)) != null) { + try { + Integer value = Integer.valueOf(new String(getResponse.getBody())); + values.add(value); + log("Drained from " + queue + ": " + value); + ch.basicAck(getResponse.getEnvelope().getDeliveryTag(), false); + count++; + } catch (Exception e) { + // ignoring, we want to drain + } + } + log("Drained " + count + " message(s) from " + queue); + }; + if (this.deadLetterMode) { + drainAction.accept(inboundQueue); + } + drainAction.accept(outboundQueue); + c.close(); + } catch (Exception e) { + log("Error while trying to drain from node " + h + ": " + e.getMessage() + "."); + } + } + + log("Drained " + values.size() + " message(s) overall"); + + return toClojureVector(values); + } else { + return toClojureVector(new ArrayList<>()); + } + } + } + + static class AsynchronousConsumerClient extends AbstractClient { + + private final Queue enqueued = new ConcurrentLinkedDeque<>(); + private final CountDownLatch cancelOkLatch = new CountDownLatch(1); + + public AsynchronousConsumerClient(String host, String username, String password, String vhost, boolean deadLetterMode, int quorumInitialGroupSize) throws Exception { + super(host, username, password, vhost, deadLetterMode, quorumInitialGroupSize); + CLIENTS.add(this); + } + + public Integer dequeue() throws Exception { + AtomicBoolean timedOut = new AtomicBoolean(false); + return asyncDequeue( + timedOut, + () -> { + initializeIfNecessary(); + if (Thread.currentThread().isInterrupted() || timedOut.get()) { + return null; + } + Delivery delivery = enqueued.poll(); + if (delivery == null) { + return null; + } else { + Integer value = Integer.valueOf(new String(delivery.getBody())); + log("Async consumer: dequeued " + value); + if (Thread.currentThread().isInterrupted() || timedOut.get()) { + log( + "Async consumer: worker thread interrupted, returning " + + value + + " to in-memory queue"); + enqueued.offer(delivery); + return null; + } + consumingChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + log("Async consumer: ack-ed " + value); + if (Thread.currentThread().isInterrupted() || timedOut.get()) { + log( + "dequeue for " + + value + + " has timed out while ack-ing, Jepsen can count this as a lost message"); + } + return value; + } + }); + } + + public IPersistentVector drain() throws Exception { + return drain(DRAINED, CLIENTS, HOSTS); + } + + protected void initialize() throws Exception { + consumingChannel = this.connection.createChannel(); + // TODO make QoS configurable? + consumingChannel.basicQos(1); + log("basic.consume from " + this.outboundQueue); + consumingChannel.basicConsume( + this.outboundQueue, + false, + (consumerTag, message) -> { + Integer value = Integer.valueOf(new String(message.getBody())); + log("Received " + value + ". Enqueuing it in client in-memory queue."); + enqueued.offer(message); + log("Enqueued: " + value); + }, + (consumerTag -> cancelOkLatch.countDown())); + + publishingChannel = connection.createChannel(); + publishingChannel.confirmSelect(); + } + + @Override + public void reconnect() throws Exception { + try { + // the client close() is protected to be idempotent, so we don't call it here. + // we need the connection to be closed to make sure messages in the in-memory + // go back to the broker + connection.close(5000); + } catch (Exception e) { + } + enqueued.clear(); // not acked anyway, so go back on the queue when connection is closed + this.connection = createConnection(); + initialize(); + } + + @Override + public String toString() { + return "Async Client [" + host + "]"; + } + } + + static class BasicGetClient extends AbstractClient { + + public BasicGetClient(String host, String username, String password, String vhost, boolean deadLetterMode, int quorumInitialGroupSize) throws Exception { + super(host, username, password, vhost, deadLetterMode, quorumInitialGroupSize); + CLIENTS.add(this); + } + + @Override + public Integer dequeue() throws Exception { + AtomicBoolean timedOut = new AtomicBoolean(false); + return asyncDequeue( + timedOut, + () -> { + initializeIfNecessary(); + if (Thread.currentThread().isInterrupted() || timedOut.get()) { + return null; + } + GetResponse getResponse = consumingChannel.basicGet(outboundQueue, false); + if (getResponse == null) { + return null; + } else { + Integer value = Integer.valueOf(new String(getResponse.getBody())); + log("Dequeued " + value); + if (Thread.currentThread().isInterrupted() || timedOut.get()) { + log("Worker thread interrupted, not ack-ing " + value + ", re-queueing it"); + // the dequeue may have timed out, requeueing could avoid keeping this message + consumingChannel.basicReject(getResponse.getEnvelope().getDeliveryTag(), true); + return null; + } + consumingChannel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false); + log("Ack-ed " + value + ", returning it to Jepsen"); + if (Thread.currentThread().isInterrupted() || timedOut.get()) { + log( + "dequeue for " + + value + + " has timed out while ack-ing, Jepsen can count this as a lost message"); + } + return value; + } + }); + } + + @Override + public IPersistentVector drain() throws Exception { + return drain(DRAINED, CLIENTS, HOSTS); + } + + protected void initialize() throws Exception { + publishingChannel = connection.createChannel(); + publishingChannel.confirmSelect(); + consumingChannel = connection.createChannel(); + } + + @Override + public void reconnect() throws Exception { + try { + close(); + } catch (Exception e) { + } + this.connection = createConnection(); + initialize(); + } + + @Override + public String toString() { + return "BasicGet Client [" + host + "]"; + } + } +} diff --git a/lavinmq/test/jepsen/lavinmq_test.clj b/lavinmq/test/jepsen/lavinmq_test.clj new file mode 100644 index 000000000..33bdc1dc3 --- /dev/null +++ b/lavinmq/test/jepsen/lavinmq_test.clj @@ -0,0 +1,7 @@ +(ns jepsen.lavinmq-test + (:require [clojure.test :refer :all] + [jepsen.lavinmq :refer :all])) + +(deftest a-test + (testing "FIXME, I fail." + (is (= 0 1))))