Skip to content

Commit

Permalink
Merge pull request #105 from fastly/configure_nsqds
Browse files Browse the repository at this point in the history
Configurable NSQDs for Consumers and Producers
  • Loading branch information
leklund authored Mar 4, 2022
2 parents 07fc2f3 + c3ad4f7 commit 6df69d2
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 44 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
NSQD_TCP_ADDRESS=127.0.0.1:4150
NSQD_HTTP_ADDRESS=127.0.0.1:4151
NSQLOOKUPD_TCP_ADDRESS=127.0.0.1:4160
NSQLOOKUPD_HTTP_ADDRESS=127.0.0.1:4161
NSQD_CONSUMERS=127.0.0.1:4150
NSQD_PRODUCERS=127.0.0.1:4150
86 changes: 80 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# fastly_nsq [![Build Status](https://travis-ci.org/fastly/fastly_nsq.svg?branch=main)](https://travis-ci.org/fastly/fastly_nsq)
# fastly_nsq [![Build Status](https://travis-ci.com/fastly/fastly_nsq.svg?branch=main)](https://travis-ci.com/fastly/fastly_nsq)

NSQ adapter and testing objects
for using the NSQ messaging system
Expand All @@ -12,9 +12,9 @@ We also include fakes
to make testing easier.

This library is dependent
on the [`nsq-ruby`] gem.
on the [`nsq-ruby-fastly`] gem.

[`nsq-ruby`]: https://github.com/wistia/nsq-ruby
[`nsq-ruby-fastly`]: https://github.com/fastly/nsq-ruby

Please use [GitHub Issues] to report bugs.

Expand All @@ -34,6 +34,65 @@ and `bundle install`.

## Usage

### Connections

NSQD cconnections can be discovered via nsqlookupd's or
specified explicity for consumers and producers.

#### Using nsqlookup:

Set the ENV variable to a comma sepearated string of lookups:

```
ENV['NSQLOOKUPD_HTTP_ADDRESS'] = "lookup01:1234,lookup01:1234"
```

Or configure them directly:

```
FastlyNsq.configure do |config|
config.lookupd_http_addresses = ["lookup01:1234", "lookup02:1234"]
end
```

#### Using nsqd directly:

NSQD connections can be specified for consumers and producers. Being able to set
different sets for consumers and producers facilitates removing and adding new instances
without downtime.

Set the following ENV variables to a comma sepearted string of nsqds:

```
ENV['NSQD_CONSUMERS']="nsqd01:4150,nsd02:4150"
ENV['NSQD_PRODUCERS']="nsqd01:4150,nsd02:4150"
```

Or configure them directly:

```
FastlyNsq.configure do |config|
config.consumer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
config.producer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
end
```

### Connection Priority

When `FastlyNsq.consumer_nsqds` or `FastlyNsq.producer_nsqds` are set they
will be used instead of `FastlyNsq.lookupd_http_addresses`.

### TLS

Set the following ENV variables to enable TLS support:

```
NSQ_SSL_KEY
NSQ_SSL_CERTIFICATE
NSQ_SSL_CA_CERTIFICATE
NSQ_SSL_VERIFY_MODE (optional)
```

### `FastlyNsq::Producer`

This is a class
Expand All @@ -50,7 +109,6 @@ message_data = {
}

producer = FastlyNsq::Producer.new(
nsqd: ENV.fetch('NSQD_TCP_ADDRESS'),
topic: topic,
)

Expand Down Expand Up @@ -281,6 +339,22 @@ expect(some_result)

## Configuration

See the [documentation](https://www.rubydoc.info/gems/fastly_nsq/FastlyNsq) for additional settings

Example:

```
FastlyNsq.configure do |config|
config.channel = "z"
config.producer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
config.lookupd_http_addresses = ["lookupd01:4161", "lookupd02:4161"]
config.logger = Logger.new(STDOUT)
config.max_attempts = 10
config.max_req_timeout = 10_000
config.max_processing_pool_threads = 42
end
```

### Environment Variables

The URLs for the various
Expand All @@ -293,10 +367,10 @@ stock NSQ on OS X,
installed via Homebrew:

```shell
NSQD_TCP_ADDRESS='127.0.0.1:4150'
NSQD_HTTP_ADDRESS='127.0.0.1:4151'
NSQLOOKUPD_TCP_ADDRESS='127.0.0.1:4160'
NSQLOOKUPD_HTTP_ADDRESS='127.0.0.1:4161, 10.1.1.101:4161'
NSQD_CONSUMERS='127.0.0.1:4150'
NSQD_PRODUCERS='127.0.0.1:4150'
```

See the [`.sample.env`](examples/.sample.env) file
Expand Down
39 changes: 38 additions & 1 deletion lib/fastly_nsq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,44 @@ def max_processing_pool_threads
# Return an array of NSQ lookupd http addresses sourced from ENV['NSQLOOKUPD_HTTP_ADDRESS']
# @return [Array<String>] list of nsqlookupd http addresses
def lookupd_http_addresses
ENV.fetch("NSQLOOKUPD_HTTP_ADDRESS").split(",").map(&:strip)
@lookups ||= ENV.fetch("NSQLOOKUPD_HTTP_ADDRESS", "").split(/, ?|\s+/).map(&:strip)
end

##
# Set the lookupd_http_addresses
# @param lookups [Array] List of http lookupd addresses to use.
def lookupd_http_addresses=(lookups)
@lookups = lookups.nil? ? nil : Array(lookups)
end

##
# Return an array of NSQD TCP addresses for NSQ consumers. Defaults to the value of ENV['NSQD_CONSUMERS'].
# ENV['NSQD_CONSUMERS'] must be a comma or space seperated string of NSQD addresses
# @return [Array<String>] list of nsqd addresses
def consumer_nsqds
@consumer_nsqds ||= ENV.fetch("NSQD_CONSUMERS", "").split(/, ?|\s+/).map(&:strip)
end

##
# Set the consumer_nsqd addresses
# @param nsqd_addresses [Array] List of consumer nsqd addresses to use
def consumer_nsqds=(nsqd_addresses)
@consumer_nsqds = nsqd_addresses.nil? ? nil : Array(nsqd_addresses)
end

##
# Return an array of NSQD TCP addresses for NSQ producers. Defaults to the value of ENV['NSQD_PRODUCERS'].
# ENV['NSQD_PRODUCERS'] must be a comma or space seperated string of NSQD addresses
# @return [Array<String>] list of nsqd addresses
def producer_nsqds
@producer_nsqds ||= ENV.fetch("NSQD_PRODUCERS", "").split(/, ?|\s+/).map(&:strip)
end

##
# Set the producer_nsqd addresses
# @param nsqd_addresses [Array] List of producer nsqd addresses to use
def producer_nsqds=(nsqd_addresses)
@producer_nsqds = nsqd_addresses.nil? ? nil : Array(nsqd_addresses)
end

# Register a block to run at a point in the lifecycle.
Expand Down
33 changes: 23 additions & 10 deletions lib/fastly_nsq/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class FastlyNsq::Consumer
##
# Create a FastlyNsq::Consumer
#
# Will connect to NSQDs in this priority: 1. direct from FastlyNsq.consumer_nsqds 2. discovered via FastlyNsq.lookupd_http_addresses.
# If both `consumer_nsqds` and `lookupd_http_addresses` are set only the FastlyNsq.consumer_nsqds will be used.
#
# @param topic [String] NSQ topic from which to consume
# @param channel [String] NSQ channel from which to consume
# @param queue [#pop, #size] Queue object, most likely an instance of {FastlyNsq::Feeder}
Expand Down Expand Up @@ -99,15 +102,25 @@ def empty?
attr_reader :tls_options

def connect(queue, **options)
Nsq::Consumer.new(
{
nsqlookupd: FastlyNsq.lookupd_http_addresses,
topic: topic,
channel: channel,
queue: queue,
max_attempts: max_attempts,
**options
}.merge(tls_options)
)
consumers = FastlyNsq.consumer_nsqds
lookupd = FastlyNsq.lookupd_http_addresses

opts = {
topic: topic,
channel: channel,
queue: queue,
max_attempts: max_attempts,
**options
}.merge(tls_options)

if !consumers.empty?
opts[:nsqd] = consumers
elsif !lookupd.empty?
opts[:nsqlookupd] = lookupd
else
raise FastlyNsq::ConnectionFailed, "One of FastlyNsq.consumer_nsqds or FastlyNsq.lookupd_http_addresses must be present"
end

Nsq::Consumer.new(opts)
end
end
31 changes: 20 additions & 11 deletions lib/fastly_nsq/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class FastlyNsq::Producer
##
# Create a FastlyNsq::Producer
#
# Will connect to NSQDs in this priority: 1. direct from FastlyNsq.producer_nsqds 2. discovered via FastlyNsq.lookupd_http_addresses.
# If both `producer_nsqds` and `lookupd_http_addresses` are set only the FastlyNsq.producer_nsqds will be used.
#
# @param topic [String] NSQ topic on which to deliver the message
# @param tls_options [Hash] Hash of TSL options passed the connection.
# In most cases this should be nil unless you need to override the
Expand Down Expand Up @@ -71,27 +74,33 @@ def write(message)
# Create an Nsq::Producer and set as +@connection+ instance variable
# @return [Boolean]
def connect
producers = FastlyNsq.producer_nsqds
lookupd = FastlyNsq.lookupd_http_addresses

@connection ||= Nsq::Producer.new(
tls_options.merge(
nsqlookupd: lookupd,
topic: topic
)
)

timeout_args = [connect_timeout, FastlyNsq::ConnectionFailed]
opts = tls_options.merge(topic: topic)

if RUBY_VERSION > "2.4.0"
timeout_args << "Failed connection to #{lookupd} within #{connect_timeout} seconds"
if !producers.empty?
opts[:nsqd] = producers
elsif !lookupd.empty?
opts[:nsqlookupd] = lookupd
else
raise FastlyNsq::ConnectionFailed, "One of FastlyNsq.producer_nsqds or FastlyNsq.lookupd_http_addresses must be present"
end

@connection ||= Nsq::Producer.new(opts)

timeout_args = [
connect_timeout,
FastlyNsq::ConnectionFailed,
"Failed connection to #{opts[:nsqd] || opts[:nsqlookupd]} within #{connect_timeout} seconds"
]

Timeout.timeout(*timeout_args) { Thread.pass until connection.connected? }

true
rescue FastlyNsq::ConnectionFailed
logger.error { "Producer for #{topic} failed to connect!" }
terminate
terminate if @connection
raise
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fastly_nsq/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module FastlyNsq
VERSION = "1.17.1"
VERSION = "1.18.0"
end
41 changes: 41 additions & 0 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,45 @@
it { should be_empty }
end
end

describe "connection priority" do
after do
FastlyNsq.lookupd_http_addresses = nil
FastlyNsq.consumer_nsqds = nil
end

it "connects to consumer_nsqds if provided" do
allow(Nsq::Consumer).to receive(:new)

expect(FastlyNsq.lookupd_http_addresses).not_to be_empty
expect(FastlyNsq.consumer_nsqds).not_to be_empty

FastlyNsq::Consumer.new(topic: topic, channel: channel)
expect(Nsq::Consumer).to have_received(:new).with a_hash_including(nsqd: FastlyNsq.consumer_nsqds).and(excluding(:nsqlookupd))
end

it "connects to lookupd_http_addresses if consumer_nsqds is empty" do
FastlyNsq.consumer_nsqds = []
allow(Nsq::Consumer).to receive(:new)

expect(FastlyNsq.lookupd_http_addresses).not_to be_empty
expect(FastlyNsq.consumer_nsqds).to be_empty

FastlyNsq::Consumer.new(topic: topic, channel: channel)
expect(Nsq::Consumer).to have_received(:new).with a_hash_including(nsqlookupd: FastlyNsq.lookupd_http_addresses).and(excluding(:nsqd))
end

it "raises when neither consumer_nsqds or lookupd_http_addresses are available" do
FastlyNsq.consumer_nsqds = []
FastlyNsq.lookupd_http_addresses = []
allow(Nsq::Consumer).to receive(:new)

expect(FastlyNsq.lookupd_http_addresses).to be_empty
expect(FastlyNsq.consumer_nsqds).to be_empty

expect { FastlyNsq::Consumer.new(topic: topic, channel: channel) }
.to raise_error(FastlyNsq::ConnectionFailed, "One of FastlyNsq.consumer_nsqds or FastlyNsq.lookupd_http_addresses must be present")
expect(Nsq::Consumer).not_to have_received(:new)
end
end
end
Loading

0 comments on commit 6df69d2

Please sign in to comment.