Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect if client closes connection #7

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
20 changes: 17 additions & 3 deletions lib/sonic/channels/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@ def help(manual)
end

def quit
execute('QUIT')
if connection.connected?
execute('QUIT')
connection.disconnect
return true
end

return false
end

def close
connection.disconnect
end

def connected?
connection.connected?
end

private

def execute(*args)
Expand All @@ -33,7 +46,8 @@ def normalize(value)
end

def sanitize(value)
value.gsub('"', '\\"').gsub(/[\r\n]+/, ' ')
# remove backslashes entirely
value.gsub(/\\/, "").gsub('"', '\\"').gsub(/[\r\n]+/, '\\n')
end

def quote(value)
Expand All @@ -46,7 +60,7 @@ def type_cast_response(value)
elsif value.start_with?('RESULT ')
value.split(' ').last.to_i
elsif value.start_with?('EVENT ')
value.split(' ')[3..-1].join(' ')
value.split(' ')[3..-1]
else
value
end
Expand Down
4 changes: 4 additions & 0 deletions lib/sonic/channels/control.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ class Control < Base
def trigger(action)
execute('TRIGGER', action)
end

def info
execute('INFO')
end
end
end
end
4 changes: 2 additions & 2 deletions lib/sonic/channels/ingest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ module Sonic
module Channels
class Ingest < Base
def push(collection, bucket, object, text, lang = nil)
arr = [collection, bucket, object, quote(text)]
arr = [collection, bucket, object, normalize(text)]
arr << "LANG(#{lang})" if lang

execute('PUSH', *arr)
end

def pop(collection, bucket, object, text)
execute('POP', collection, bucket, object, quote(text))
execute('POP', collection, bucket, object, normalize(text))
end

def count(collection, bucket = nil, object = nil)
Expand Down
14 changes: 12 additions & 2 deletions lib/sonic/channels/search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Sonic
module Channels
class Search < Base
def query(collection, bucket, terms, limit = nil, offset = nil, lang = nil) # rubocop:disable Metrics/ParameterLists, Metrics/LineLength
arr = [collection, bucket, quote(terms)]
arr = [collection, bucket, normalize(terms)]
arr << "LIMIT(#{limit})" if limit
arr << "OFFSET(#{offset})" if offset
arr << "LANG(#{lang})" if lang
Expand All @@ -13,13 +13,23 @@ def query(collection, bucket, terms, limit = nil, offset = nil, lang = nil) # ru
end

def suggest(collection, bucket, word, limit = nil)
arr = [collection, bucket, quote(word)]
arr = [collection, bucket, normalize(word)]
arr << "LIMIT(#{limit})" if limit

execute('SUGGEST', *arr) do
connection.read # ...
end
end

def list(collection, bucket, limit = nil, offset = nil)
arr = [collection, bucket]
arr << "LIMIT(#{limit})" if limit
arr << "OFFSET(#{offset})" if offset

execute('LIST', *arr) do
connection.read # ...
end
end
end
end
end
39 changes: 33 additions & 6 deletions lib/sonic/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,57 @@ def initialize(host, port, channel_type, password = nil)
end

def connect
read # ...
socket.gets # ...
write(['START', @channel_type, @password].compact.join(' '))
read.start_with?('STARTED ')
end

def disconnect
socket.close
socket&.close
socket = nil
end

def connected?
!socket.nil?
end

def read
data = socket.gets.chomp
raise ServerError, data if data.start_with?('ERR ')
data = socket.gets&.chomp

if data.nil?
# connection was dropped from timeout
disconnect
raise ServerError, "Connection expired. Please reconnect."
end

raise ServerError, "#{data.force_encoding('UTF-8')} (#{@last_write})" if data.start_with?('ENDED ')
raise ServerError, "#{data.force_encoding('UTF-8')} (#{@last_write})" if data.start_with?('ERR ')

data
end

def write(data)
socket.puts(data)
@last_write = data

begin
socket.puts(data)
rescue Errno::EPIPE => error
disconnect
raise ServerError, "Connection expired. Please reconnect.", error.backtrace
end
end


private

def socket
@socket ||= TCPSocket.open(@host, @port)
@socket ||= begin
socket = TCPSocket.open(@host, @port)
# disables Nagle's Algorithm, prevents multiple round trips with MULTI
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
socket
end
end

end
end