Skip to content

Commit

Permalink
pass will specs
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Oct 21, 2024
1 parent e0a9c0f commit 76aa9c3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 16 deletions.
14 changes: 7 additions & 7 deletions spec/mqtt/integrations/will_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module MqttSpecs
extend MqttMatchers

describe "client will" do
pending "will is not delivered on graceful disconnect [MQTT-3.14.4-3]" do
it "will is not delivered on graceful disconnect [MQTT-3.14.4-3]" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand Down Expand Up @@ -34,7 +34,7 @@ module MqttSpecs
end
end

pending "will is delivered on ungraceful disconnect" do
it "will is delivered on ungraceful disconnect" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand All @@ -59,7 +59,7 @@ module MqttSpecs
end
end

pending "will can be retained [MQTT-3.1.2-17]" do
it "will can be retained [MQTT-3.1.2-17]" do
with_server do |server|
with_client_io(server) do |io2|
will = MQTT::Protocol::Will.new(
Expand All @@ -85,7 +85,7 @@ module MqttSpecs
end
end

pending "will won't be published if missing permission" do
it "will won't be published if missing permission" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand All @@ -110,7 +110,7 @@ module MqttSpecs
end
end

pending "will qos can't be set of will flag is unset [MQTT-3.1.2-13]" do
it "will qos can't be set of will flag is unset [MQTT-3.1.2-13]" do
with_server do |server|
with_client_io(server) do |io|
temp_io = IO::Memory.new
Expand All @@ -127,7 +127,7 @@ module MqttSpecs
end
end

pending "will qos must not be 3 [MQTT-3.1.2-14]" do
it "will qos must not be 3 [MQTT-3.1.2-14]" do
with_server do |server|
with_client_io(server) do |io|
temp_io = IO::Memory.new
Expand All @@ -146,7 +146,7 @@ module MqttSpecs
end
end

pending "will retain can't be set of will flag is unset [MQTT-3.1.2-15]" do
it "will retain can't be set of will flag is unset [MQTT-3.1.2-15]" do
with_server do |server|
with_client_io(server) do |io|
temp_io = IO::Memory.new
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/exchange/mqtt.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ module LavinMQ
@retain_store.retain(topic, msg.body_io, msg.bodysize)
end

@tree.each_entry(topic) do |queue, qos|
@tree.each_entry(msg.routing_key) do |queue, qos|
msg.properties.delivery_mode = qos
if queue.publish(msg)
count += 1
Expand Down
17 changes: 11 additions & 6 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ module LavinMQ
@clients.delete client_id
end

def publish(packet : MQTT::Publish)
def create_properties(packet : MQTT::Publish | MQTT::Will) : AMQP::Properties
headers = AMQP::Table.new
headers["x-mqtt-retain"] = true if packet.retain?
headers["x-mqtt-will"] = true if packet.is_a?(MQTT::Will)
AMQP::Properties.new(headers: headers).tap { |props| props.delivery_mode = packet.qos if packet.responds_to?(:qos) }
end

def publish(packet : MQTT::Publish | MQTT::Will)
rk = topicfilter_to_routingkey(packet.topic)
properties = if packet.retain?
AMQP::Properties.new(headers: AMQP::Table.new({"x-mqtt-retain": true}))
else
AMQ::Protocol::Properties.new
end
properties = create_properties(packet)
# TODO: String.new around payload.. should be stored as Bytes
msg = Message.new("mqtt.default", rk, String.new(packet.payload), properties)
@exchange.publish(msg, false)
Expand All @@ -103,6 +106,8 @@ module LavinMQ
packet.topic_filters.each do |tf|
qos << MQTT::SubAck::ReturnCode.from_int(tf.qos)
session.subscribe(tf.topic, tf.qos)

#Publish retained messages
@retain_store.each(tf.topic) do |topic, body|
rk = topicfilter_to_routingkey(topic)
msg = Message.new("mqtt.default", rk, String.new(body),
Expand Down
3 changes: 1 addition & 2 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ module LavinMQ
raise ex
ensure
@broker.disconnect_client(client_id)

@socket.close
# move to disconnect client
@broker.vhost.rm_connection(self)
Expand Down Expand Up @@ -128,9 +127,9 @@ module LavinMQ
}.merge(stats_details)
end

# TODO: actually publish will to session
private def publish_will
if will = @will
@broker.publish(will)
end
rescue ex
@log.warn { "Failed to publish will: #{ex.message}" }
Expand Down

0 comments on commit 76aa9c3

Please sign in to comment.