Skip to content

Commit

Permalink
Merge pull request #109 from cloudamqp/support_3_13
Browse files Browse the repository at this point in the history
Support message containers introduced in RabbitMQ 3.13.0
  • Loading branch information
noxdafox authored Jun 29, 2024
2 parents 2591f9d + 415e16e commit 031ebc8
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
elixir:
- '1.16.3'
rmqref:
- v3.12.x
- v3.13.x
steps:
- uses: actions/checkout@v4
- name: Install Erlang and Elixir
Expand Down
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
PROJECT = rabbitmq_message_deduplication
PROJ_VSN = $(shell $(MIX) eval 'Mix.Project.config()[:version] |> IO.puts()')

DEPS = rabbit_common rabbit
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client
Expand All @@ -25,15 +26,15 @@ app:: $(elixir_srcs) deps

dist:: app
mkdir -p $(DIST_DIR)
$(MIX) make_archives
$(MIX) archive.build.elixir
$(MIX) archive.build -o $(DIST_DIR)/$(PROJECT)-$(PROJ_VSN).ez
cp -r _build/$(MIX_ENV)/archives/elixir-*.ez $(DIST_DIR)
cp -r _build/$(MIX_ENV)/archives/$(PROJECT)-*.ez $(DIST_DIR)

test-build:: app
mkdir -p $(DIST_DIR)
$(MIX) make_archives
$(MIX) archive.build.elixir
$(MIX) archive.build -o $(DIST_DIR)/$(PROJECT)-$(PROJ_VSN).ez
cp -r _build/$(MIX_ENV)/archives/elixir-*.ez $(DIST_DIR)
cp -r _build/$(MIX_ENV)/archives/$(PROJECT)-*.ez $(DIST_DIR)

tests:: $(elixir_srcs) deps
MIX_ENV=test $(MIX) make_tests
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ Then copy all the *.ez files inside the plugins folder to the [RabbitMQ plugins
[sudo] rabbitmq-plugins enable rabbitmq_message_deduplication
```

## Version requirements

The latest version of the plugin requires RabbitMQ 3.13.0.

Earlier RabbitMQ versions are supported by 0.6.2.

## Exchange level deduplication

The exchange type `x-message-deduplication` allows to filter message duplicates before any routing rule is applied.
Expand Down
42 changes: 20 additions & 22 deletions lib/rabbitmq_message_deduplication/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,11 @@ defmodule RabbitMQMessageDeduplication.Common do
"""

import Record, only: [defrecord: 2, defrecord: 3, extract: 2]

require RabbitMQMessageDeduplication.Cache

alias :rabbit_binary_parser, as: RabbitBinaryParser
alias :mc, as: MC
alias RabbitMQMessageDeduplication.Cache, as: Cache

defrecord :content, extract(
:content, from_lib: "rabbit_common/include/rabbit.hrl")

@type basic_message :: record(:basic_message)
defrecord :basic_message, extract(
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_properties, :P_basic, extract(
:P_basic, from_lib: "rabbit_common/include/rabbit_framing.hrl")

@default_arguments %{type: nil, default: nil}

@doc """
Expand All @@ -55,14 +43,24 @@ defmodule RabbitMQMessageDeduplication.Common do
@doc """
Retrieve the given header from the message.
"""
@spec message_header(basic_message, String.t) :: String.t | nil
def message_header(basic_message(content: message_content), header) do
message_content = RabbitBinaryParser.ensure_content_decoded(message_content)

case content(message_content, :properties) do
basic_properties(headers: headers) when is_list(headers) ->
rabbit_keyfind(headers, header)
basic_properties(headers: :undefined) -> nil
@spec message_header(MC.state, String.t) :: String.t | integer() | float() | boolean() | :undefined | nil
def message_header(message, header) do
case MC.x_header(header, message) do
{_type, value} when not is_list(value) and not is_tuple(value) ->
# list and tuple values have type-tagged elements
# that would need to be untagged recursively
# we don't expect to use such headers, so those cases are not handled
value
:null ->
# header value in AMQP message was {:void, :undefined}

# pre-3.13 version of this function used rabbit_keyfind/2
# which returned :undefined instead of nil or :void. We have to
# keep this value as this is used in keys to cache the message
# and is preserved during a rolling upgrade in a replicated
# Mnesia table
:undefined
:undefined -> nil
end
end

Expand All @@ -71,7 +69,7 @@ defmodule RabbitMQMessageDeduplication.Common do
If not, it adds it to the cache with the corresponding name.
"""
@spec duplicate?(tuple, basic_message, integer | nil) :: boolean
@spec duplicate?(tuple, MC.state, integer | nil) :: boolean
def duplicate?(name, message, ttl \\ nil) do
cache = cache_name(name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ defmodule RabbitMQMessageDeduplication.Exchange do
defrecord :exchange, extract(
:exchange, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :delivery, extract(
:delivery, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_message, extract(
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")

@doc """
Register the exchange type within the Broker.
"""
Expand Down Expand Up @@ -90,7 +84,7 @@ defmodule RabbitMQMessageDeduplication.Exchange do
end

@impl :rabbit_exchange_type
def route(exchange(name: name), delivery(message: msg = basic_message())) do
def route(exchange(name: name), msg, _opts) do
if route?(name, msg) do
RabbitRouter.match_routing_key(name, [:_])
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ defmodule RabbitMQMessageDeduplication.Queue do
"""

import Record, only: [defrecord: 2, defrecord: 3, extract: 2]
import Record, only: [defrecord: 2]

require RabbitMQMessageDeduplication.Cache
require RabbitMQMessageDeduplication.Common

alias :amqqueue, as: AMQQueue
alias :rabbit_log, as: RabbitLog
alias :rabbit_amqqueue, as: RabbitQueue
alias :mc, as: MC
alias RabbitMQMessageDeduplication.Common, as: Common
alias RabbitMQMessageDeduplication.Cache, as: Cache
alias RabbitMQMessageDeduplication.CacheManager, as: CacheManager
Expand All @@ -46,15 +47,6 @@ defmodule RabbitMQMessageDeduplication.Queue do
{:requires, :kernel_ready},
{:enables, :core_initialized}]}

defrecord :content, extract(
:content, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_message, extract(
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_properties, :P_basic, extract(
:P_basic, from_lib: "rabbit_common/include/rabbit_framing.hrl")

defrecord :dqack, [:tag, :header]
defrecord :dqstate, [:queue, :queue_state, dedup_enabled: false]

Expand Down Expand Up @@ -286,8 +278,9 @@ defmodule RabbitMQMessageDeduplication.Queue do
if dedup_queue?(state) do
case fetch(need_ack, state) do
{:empty, state} -> {:empty, state}
{{message = basic_message(id: id), _, ack_tag}, state} ->
{{message, _, ack_tag}, state} ->
maybe_delete_cache_entry(queue, message)
id = MC.get_annotation(:id, message)

{{id, ack_tag}, state}
end
Expand Down Expand Up @@ -521,7 +514,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
end

# Returns true if the message is a duplicate.
defp duplicate?(queue, message = basic_message()) do
defp duplicate?(queue, message) do
name = AMQQueue.get_name(queue)

if Common.duplicate?(name, message, message_expiration(message)) do
Expand All @@ -533,18 +526,14 @@ defmodule RabbitMQMessageDeduplication.Queue do

# Returns the expiration property of the given message
defp message_expiration(message) do
basic_message(content: content(properties: properties)) = message

case properties do
basic_properties(expiration: ttl) when is_bitstring(ttl) ->
String.to_integer(ttl)
basic_properties(expiration: :undefined) -> nil
case MC.ttl(message) do
:undefined -> nil
ttl -> ttl
end
end

# Removes the message deduplication header from the cache
defp maybe_delete_cache_entry(queue, msg = basic_message()) do
defp maybe_delete_cache_entry(queue, msg) when is_tuple(msg) do
header = Common.message_header(msg, "x-deduplication-header")
maybe_delete_cache_entry(queue, header)
end
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Mixfile do
applications: [:mnesia],
extra_applications: [:rabbit],
mod: {RabbitMQMessageDeduplication, []},
registered: [RabbitMQMessageDeduplication]
registered: [RabbitMQMessageDeduplication],
broker_version_requirements: ["3.13.0"]
]
end

Expand Down

0 comments on commit 031ebc8

Please sign in to comment.