Skip to content

Commit

Permalink
general fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Jan 29, 2025
1 parent 8bc6acd commit aa466e1
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 12 deletions.
1 change: 1 addition & 0 deletions spec/mqtt/integrations/will_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ module MqttSpecs
subscribe(io, topic_filters: topic_filters)

pub = read_packet(io).should be_a(MQTT::Protocol::Publish)
pp pub
pub.payload.should eq("dead".to_slice)
pub.topic.should eq("will/t")
pub.retain?.should eq(true)
Expand Down
9 changes: 3 additions & 6 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,17 @@ module LavinMQ
def subscribe(client, topics)
session = sessions[client.client_id]? || sessions.declare(client)
session.client = client
qos = Array(MQTT::SubAck::ReturnCode).new(topics.size)
topics.each do |tf|
qos << MQTT::SubAck::ReturnCode.from_int(tf.qos)
topics.map do |tf|
session.subscribe(tf.topic, tf.qos)
@retain_store.each(tf.topic) do |topic, body|
headers = AMQP::Table.new
headers[RETAIN_HEADER] = true
headers = AMQP::Table.new({RETAIN_HEADER => true})
msg = Message.new(EXCHANGE, topic, String.new(body),
AMQP::Properties.new(headers: headers,
delivery_mode: tf.qos))
session.publish(msg)
end
MQTT::SubAck::ReturnCode.from_int(tf.qos)
end
qos
end

def unsubscribe(client_id, topics)
Expand Down
2 changes: 0 additions & 2 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,6 @@ module LavinMQ
def flow(active : Bool)
end



def ack(sp)
end

Expand Down
1 change: 0 additions & 1 deletion src/lavinmq/mqtt/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ module LavinMQ
def authenticate(io, packet)
return unless (username = packet.username) && (password = packet.password)


if split_pos = username.index(':')
@config.default_mqtt_vhost = username[0, split_pos]
username = username[split_pos + 1..]
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ module LavinMQ
end
consumer = consumers.first.as(MQTT::Consumer)
get_packet do |pub_packet|
pp pub_packet
consumer.deliver(pub_packet)
end
Fiber.yield if (i &+= 1) % 32768 == 0
Expand Down Expand Up @@ -143,6 +144,7 @@ module LavinMQ

def build_packet(env, packet_id) : MQTT::Publish
msg = env.message
pp msg
retained = msg.properties.try &.headers.try &.["mqtt.retain"]? == true
qos = msg.properties.delivery_mode || 0u8
qos = 1u8 if qos > 1
Expand Down
6 changes: 3 additions & 3 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ module LavinMQ
Schema.migrate(@data_dir, @replicator)
@users = UserStore.new(@data_dir, @replicator)
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
mqtt_brokers = MQTT::Brokers.new(@vhosts, @replicator)
@mqtt_brokers = MQTT::Brokers.new(@vhosts, @replicator)
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
@connection_factories = {
Protocol::AMQP => AMQP::ConnectionFactory.new(@users, @vhosts),
Protocol::MQTT => MQTT::ConnectionFactory.new(@users, mqtt_brokers, @config),
Protocol::MQTT => MQTT::ConnectionFactory.new(@users, @mqtt_brokers, @config),
}
apply_parameter
spawn stats_loop, name: "Server#stats_loop"
Expand Down Expand Up @@ -83,7 +83,7 @@ module LavinMQ
@users = UserStore.new(@data_dir, @replicator)
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
@connection_factories[Protocol::AMQP] = AMQP::ConnectionFactory.new(@users, @vhosts)
@connection_factories[Protocol::MQTT] = MQTT::ConnectionFactory.new(@users, mqtt_brokers)
@connection_factories[Protocol::MQTT] = MQTT::ConnectionFactory.new(@users, @mqtt_brokers, @config)
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
apply_parameter
@closed = false
Expand Down

0 comments on commit aa466e1

Please sign in to comment.