Skip to content

Commit

Permalink
Parse AMQP frames and send one per WS frame
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed May 27, 2024
1 parent 7b35e34 commit 6a84b1e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
6 changes: 5 additions & 1 deletion shard.lock
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
version: 2.0
shards: {}
shards:
amq-protocol:
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.14

4 changes: 4 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ targets:
websocket-tcp-relay:
main: src/websocket-tcp-relay.cr

dependencies:
amq-protocol:
github: cloudamqp/amq-protocol.cr

crystal: 1.0.0

license: Apache-2.0
36 changes: 28 additions & 8 deletions src/websocket-tcp-relay/websocket_relay.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "http/server"
require "openssl"
require "amq-protocol"

module WebSocketTCPRelay
class WebSocketRelay
Expand Down Expand Up @@ -38,20 +39,39 @@ module WebSocketTCPRelay
socket.as?(TCPSocket).try &.sync = true
socket.as?(OpenSSL::SSL::Socket::Client).try &.sync = true

ws.on_binary do |bytes|
socket.write(bytes)
end

ws.on_close do |_code, _message|
socket.close
end

amqp_protocol = Channel(Bool).new
first_bytes = true
ws.on_binary do |bytes|
if first_bytes
first_bytes = false
if bytes == AMQ::Protocol::PROTOCOL_START_0_9_1
amqp_protocol.send true
else
amqp_protocol.send false
end
end
socket.write(bytes)
end
spawn(name: "WS #{remote_addr}") do
begin
count = 0
buffer = Bytes.new(4096)
while (count = socket.read(buffer)) > 0
ws.send(buffer[0, count])
if amqp_protocol.receive
mem = IO::Memory.new(4096)
loop do
frame = AMQ::Protocol::Frame.from_io(socket)
frame.to_io(mem, IO::ByteFormat::NetworkEndian)
ws.send(mem.to_slice)
mem.clear
end
else
buffer = Bytes.new(4096)
count = 0
while (count = socket.read(buffer)) > 0
ws.send(buffer[0, count])
end
end
puts "#{remote_addr} disconnected by server"
rescue ex
Expand Down

0 comments on commit 6a84b1e

Please sign in to comment.