Skip to content

Commit

Permalink
remove redundant test and add test and adapt code for mqtt-3.3.1-9
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Feb 6, 2025
1 parent 87f5c46 commit 7f8d742
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 24 deletions.
52 changes: 30 additions & 22 deletions spec/mqtt/integrations/retained_messages_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,36 @@ module MqttSpecs
end
end

it "retain flag is 0 if there is an established subscription [MQTT-3.3.1-9]" do
with_server do |server|
wg = WaitGroup.new(1)
with_client_io(server) do |io|
spawn do
with_client_io(server) do |io|
connect(io, client_id: "subscriber")
subscribe(io, topic_filters: [subtopic("a/b", 1u8)])
pub = read_packet(io).as(MQTT::Protocol::Publish)
pub.retain?.should eq(true)
wg.done
pub = read_packet(io).as(MQTT::Protocol::Publish)
pub.retain?.should eq(false)
wg.done
end
end
end

with_client_io(server) do |io|
connect(io, client_id: "publisher")
publish(io, topic: "a/b", qos: 0u8, retain: true)
wg.wait
wg.add(1)
publish(io, topic: "a/b", qos: 0u8, retain: true)
wg.wait
disconnect(io)
end
end
end

it "retained messages are redelivered for subscriptions with qos1" do
with_server do |server|
with_client_io(server) do |io|
Expand Down Expand Up @@ -53,27 +83,5 @@ module MqttSpecs
end
end
end

it "retain is set in PUBLISH for retained messages" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
publish(io, topic: "a/b", qos: 0u8, retain: true)
disconnect(io)
end

with_client_io(server) do |io|
connect(io)
# Subscribe with qos=0 means downgrade messages to qos=0
topic_filters = mk_topic_filters({"a/b", 0u8})
subscribe(io, topic_filters: topic_filters)

pub = read_packet(io).as(MQTT::Protocol::Publish)
pub.retain?.should eq(true)

disconnect(io)
end
end
end
end
end
1 change: 1 addition & 0 deletions spec/mqtt/spec_helper/mqtt_helpers_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ module MqttHelpers
def mk_topic_filters(*args) : Array(MQTT::Protocol::Subscribe::TopicFilter)
ret = Array(MQTT::Protocol::Subscribe::TopicFilter).new
args.each { |topic, qos| ret << subtopic(topic, qos) }
pp ret
ret
end

Expand Down
3 changes: 1 addition & 2 deletions src/lavinmq/mqtt/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ module LavinMQ

def publish(packet : MQTT::Publish) : Int32
@publish_in_count += 1
headers = AMQP::Table.new({RETAIN_HEADER: true}) if packet.retain?
properties = AMQP::Properties.new(headers: headers)
properties = AMQP::Properties.new(headers: AMQP::Table.new)
properties.delivery_mode = packet.qos

timestamp = RoughTime.unix_ms
Expand Down

0 comments on commit 7f8d742

Please sign in to comment.