As we reevaluate how to best support and maintain Staging Ref in the future, we encourage development teams using this environment to highlight their use cases in the following issue: https://gitlab.com/gitlab-com/gl-infra/software-delivery/framework/software-delivery-framework-issue-tracker/-/issues/36.

Skip to content
Snippets Groups Projects
Commit d8f3b9bc authored by Stan Hu's avatar Stan Hu
Browse files

Add redis-rb as a vendored library

`make update-redis` will clone the library and adjust the paths properly
parent b99bc5d4
No related branches found
No related tags found
No related merge requests found
Showing
with 5306 additions and 6 deletions
Loading
Loading
@@ -8,3 +8,4 @@ coverage/
.bundle
tags
.bundle/
*.orig
source "http://rubygems.org"
gem 'redis', '~> 3.3'
group :development, :test do
gem 'coveralls', require: false
gem 'rspec', '~> 2.14.0'
Loading
Loading
Loading
Loading
@@ -37,7 +37,6 @@ GEM
method_source (~> 0.8)
slop (~> 3.3.1)
rainbow (2.0.0)
redis (3.3.0)
rest-client (1.7.2)
mime-types (>= 1.16, < 3.0)
netrc (~> 0.7)
Loading
Loading
@@ -78,7 +77,6 @@ DEPENDENCIES
coveralls
guard
guard-rspec
redis (~> 3.3)
rspec (~> 2.14.0)
rubocop (= 0.28.0)
vcr
Loading
Loading
Makefile 0 → 100644
REDIS_RB_VERSION=v3.3.0
REDIS_RB_TMP_DIR := $(shell mktemp -d)
REDIS_RB_VENDOR_DIR=lib/vendor
PWD=`pwd`
all:
update-redis:
cd lib
git clone https://github.com/redis/redis-rb.git $(REDIS_RB_TMP_DIR)
cd $(REDIS_RB_TMP_DIR); git checkout $(REDIS_RB_VERSION)
cd $(PWD)
mkdir -p $(REDIS_RB_VENDOR_DIR)
cp -r $(REDIS_RB_TMP_DIR)/lib/* $(REDIS_RB_VENDOR_DIR)
# Adjust all 'require redis/' paths to relative paths
sed -i.orig -e 's/require "redis/require_relative "redis/g' $(REDIS_RB_VENDOR_DIR)/redis.rb
find $(REDIS_RB_VENDOR_DIR)/redis -name \*.rb -maxdepth 1 -exec sed -i.orig -e "s/require \"redis\//require_relative \"/g" {} \;
find $(REDIS_RB_VENDOR_DIR)/redis/connection -name \*.rb -maxdepth 1 -exec sed -i.orig -e 's/require "redis\/connection\//require_relative "/g' *.rb {} \;
find $(REDIS_RB_VENDOR_DIR)/redis/connection -name \*.rb -maxdepth 1 -exec sed -i.orig -e 's/require "redis\//require_relative "..\//g' *.rb {} \;
.PHONY=update-redis
require 'net/http'
require 'openssl'
require 'json'
require 'redis'
require_relative 'gitlab_config'
require_relative 'gitlab_logger'
require_relative 'gitlab_access'
require_relative 'httpunix'
require_relative 'vendor/redis'
class GitlabNet
class ApiUnreachableError < StandardError; end
Loading
Loading
Loading
Loading
@@ -2,7 +2,6 @@ require_relative 'gitlab_init'
require_relative 'gitlab_net'
require 'json'
require 'base64'
require 'redis'
require 'securerandom'
class GitlabPostReceive
Loading
Loading
This diff is collapsed.
require_relative "errors"
require "socket"
require "cgi"
class Redis
class Client
DEFAULTS = {
:url => lambda { ENV["REDIS_URL"] },
:scheme => "redis",
:host => "127.0.0.1",
:port => 6379,
:path => nil,
:timeout => 5.0,
:password => nil,
:db => 0,
:driver => nil,
:id => nil,
:tcp_keepalive => 0,
:reconnect_attempts => 1,
:inherit_socket => false
}
def options
Marshal.load(Marshal.dump(@options))
end
def scheme
@options[:scheme]
end
def host
@options[:host]
end
def port
@options[:port]
end
def path
@options[:path]
end
def read_timeout
@options[:read_timeout]
end
def connect_timeout
@options[:connect_timeout]
end
def timeout
@options[:read_timeout]
end
def password
@options[:password]
end
def db
@options[:db]
end
def db=(db)
@options[:db] = db.to_i
end
def driver
@options[:driver]
end
def inherit_socket?
@options[:inherit_socket]
end
attr_accessor :logger
attr_reader :connection
attr_reader :command_map
def initialize(options = {})
@options = _parse_options(options)
@reconnect = true
@logger = @options[:logger]
@connection = nil
@command_map = {}
@pending_reads = 0
if options.include?(:sentinels)
@connector = Connector::Sentinel.new(@options)
else
@connector = Connector.new(@options)
end
end
def connect
@pid = Process.pid
# Don't try to reconnect when the connection is fresh
with_reconnect(false) do
establish_connection
call [:auth, password] if password
call [:select, db] if db != 0
call [:client, :setname, @options[:id]] if @options[:id]
@connector.check(self)
end
self
end
def id
@options[:id] || "redis://#{location}/#{db}"
end
def location
path || "#{host}:#{port}"
end
def call(command)
reply = process([command]) { read }
raise reply if reply.is_a?(CommandError)
if block_given?
yield reply
else
reply
end
end
def call_loop(command, timeout = 0)
error = nil
result = with_socket_timeout(timeout) do
process([command]) do
loop do
reply = read
if reply.is_a?(CommandError)
error = reply
break
else
yield reply
end
end
end
end
# Raise error when previous block broke out of the loop.
raise error if error
# Result is set to the value that the provided block used to break.
result
end
def call_pipeline(pipeline)
with_reconnect pipeline.with_reconnect? do
begin
pipeline.finish(call_pipelined(pipeline.commands)).tap do
self.db = pipeline.db if pipeline.db
end
rescue ConnectionError => e
return nil if pipeline.shutdown?
# Assume the pipeline was sent in one piece, but execution of
# SHUTDOWN caused none of the replies for commands that were executed
# prior to it from coming back around.
raise e
end
end
end
def call_pipelined(commands)
return [] if commands.empty?
# The method #ensure_connected (called from #process) reconnects once on
# I/O errors. To make an effort in making sure that commands are not
# executed more than once, only allow reconnection before the first reply
# has been read. When an error occurs after the first reply has been
# read, retrying would re-execute the entire pipeline, thus re-issuing
# already successfully executed commands. To circumvent this, don't retry
# after the first reply has been read successfully.
result = Array.new(commands.size)
reconnect = @reconnect
begin
exception = nil
process(commands) do
result[0] = read
@reconnect = false
(commands.size - 1).times do |i|
reply = read
result[i + 1] = reply
exception = reply if exception.nil? && reply.is_a?(CommandError)
end
end
raise exception if exception
ensure
@reconnect = reconnect
end
result
end
def call_with_timeout(command, timeout, &blk)
with_socket_timeout(timeout) do
call(command, &blk)
end
rescue ConnectionError
retry
end
def call_without_timeout(command, &blk)
call_with_timeout(command, 0, &blk)
end
def process(commands)
logging(commands) do
ensure_connected do
commands.each do |command|
if command_map[command.first]
command = command.dup
command[0] = command_map[command.first]
end
write(command)
end
yield if block_given?
end
end
end
def connected?
!! (connection && connection.connected?)
end
def disconnect
connection.disconnect if connected?
end
def reconnect
disconnect
connect
end
def io
yield
rescue TimeoutError => e1
# Add a message to the exception without destroying the original stack
e2 = TimeoutError.new("Connection timed out")
e2.set_backtrace(e1.backtrace)
raise e2
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL => e
raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last]
end
def read
io do
value = connection.read
@pending_reads -= 1
value
end
end
def write(command)
io do
@pending_reads += 1
connection.write(command)
end
end
def with_socket_timeout(timeout)
connect unless connected?
begin
connection.timeout = timeout
yield
ensure
connection.timeout = self.timeout if connected?
end
end
def without_socket_timeout(&blk)
with_socket_timeout(0, &blk)
end
def with_reconnect(val=true)
begin
original, @reconnect = @reconnect, val
yield
ensure
@reconnect = original
end
end
def without_reconnect(&blk)
with_reconnect(false, &blk)
end
protected
def logging(commands)
return yield unless @logger && @logger.debug?
begin
commands.each do |name, *args|
logged_args = args.map do |a|
case
when a.respond_to?(:inspect) then a.inspect
when a.respond_to?(:to_s) then a.to_s
else
# handle poorly-behaved descendants of BasicObject
klass = a.instance_exec { (class << self; self end).superclass }
"\#<#{klass}:#{a.__id__}>"
end
end
@logger.debug("[Redis] command=#{name.to_s.upcase} args=#{logged_args.join(' ')}")
end
t1 = Time.now
yield
ensure
@logger.debug("[Redis] call_time=%0.2f ms" % ((Time.now - t1) * 1000)) if t1
end
end
def establish_connection
server = @connector.resolve.dup
@options[:host] = server[:host]
@options[:port] = Integer(server[:port]) if server.include?(:port)
@connection = @options[:driver].connect(@options)
@pending_reads = 0
rescue TimeoutError,
Errno::ECONNREFUSED,
Errno::EHOSTDOWN,
Errno::EHOSTUNREACH,
Errno::ENETUNREACH,
Errno::ETIMEDOUT
raise CannotConnectError, "Error connecting to Redis on #{location} (#{$!.class})"
end
def ensure_connected
disconnect if @pending_reads > 0
attempts = 0
begin
attempts += 1
if connected?
unless inherit_socket? || Process.pid == @pid
raise InheritedError,
"Tried to use a connection from a child process without reconnecting. " +
"You need to reconnect to Redis after forking " +
"or set :inherit_socket to true."
end
else
connect
end
yield
rescue BaseConnectionError
disconnect
if attempts <= @options[:reconnect_attempts] && @reconnect
retry
else
raise
end
rescue Exception
disconnect
raise
end
end
def _parse_options(options)
return options if options[:_parsed]
defaults = DEFAULTS.dup
options = options.dup
defaults.keys.each do |key|
# Fill in defaults if needed
if defaults[key].respond_to?(:call)
defaults[key] = defaults[key].call
end
# Symbolize only keys that are needed
options[key] = options[key.to_s] if options.has_key?(key.to_s)
end
url = options[:url] || defaults[:url]
# Override defaults from URL if given
if url
require "uri"
uri = URI(url)
if uri.scheme == "unix"
defaults[:path] = uri.path
elsif uri.scheme == "redis" || uri.scheme == "rediss"
defaults[:scheme] = uri.scheme
defaults[:host] = uri.host if uri.host
defaults[:port] = uri.port if uri.port
defaults[:password] = CGI.unescape(uri.password) if uri.password
defaults[:db] = uri.path[1..-1].to_i if uri.path
defaults[:role] = :master
else
raise ArgumentError, "invalid uri scheme '#{uri.scheme}'"
end
defaults[:ssl] = true if uri.scheme == "rediss"
end
# Use default when option is not specified or nil
defaults.keys.each do |key|
options[key] = defaults[key] if options[key].nil?
end
if options[:path]
# Unix socket
options[:scheme] = "unix"
options.delete(:host)
options.delete(:port)
else
# TCP socket
options[:host] = options[:host].to_s
options[:port] = options[:port].to_i
end
if options.has_key?(:timeout)
options[:connect_timeout] ||= options[:timeout]
options[:read_timeout] ||= options[:timeout]
options[:write_timeout] ||= options[:timeout]
end
options[:connect_timeout] = Float(options[:connect_timeout])
options[:read_timeout] = Float(options[:read_timeout])
options[:write_timeout] = Float(options[:write_timeout])
options[:db] = options[:db].to_i
options[:driver] = _parse_driver(options[:driver]) || Connection.drivers.last
case options[:tcp_keepalive]
when Hash
[:time, :intvl, :probes].each do |key|
unless options[:tcp_keepalive][key].is_a?(Fixnum)
raise "Expected the #{key.inspect} key in :tcp_keepalive to be a Fixnum"
end
end
when Fixnum
if options[:tcp_keepalive] >= 60
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 20, :intvl => 10, :probes => 2}
elsif options[:tcp_keepalive] >= 30
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 10, :intvl => 5, :probes => 2}
elsif options[:tcp_keepalive] >= 5
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 2, :intvl => 2, :probes => 1}
end
end
options[:_parsed] = true
options
end
def _parse_driver(driver)
driver = driver.to_s if driver.is_a?(Symbol)
if driver.kind_of?(String)
begin
require_relative "connection/#{driver}"
driver = Connection.const_get(driver.capitalize)
rescue LoadError, NameError
raise RuntimeError, "Cannot load driver #{driver.inspect}"
end
end
driver
end
class Connector
def initialize(options)
@options = options.dup
end
def resolve
@options
end
def check(client)
end
class Sentinel < Connector
def initialize(options)
super(options)
@options[:password] = DEFAULTS.fetch(:password)
@options[:db] = DEFAULTS.fetch(:db)
@sentinels = @options.delete(:sentinels).dup
@role = @options.fetch(:role, "master").to_s
@master = @options[:host]
end
def check(client)
# Check the instance is really of the role we are looking for.
# We can't assume the command is supported since it was introduced
# recently and this client should work with old stuff.
begin
role = client.call([:role])[0]
rescue Redis::CommandError
# Assume the test is passed if we can't get a reply from ROLE...
role = @role
end
if role != @role
client.disconnect
raise ConnectionError, "Instance role mismatch. Expected #{@role}, got #{role}."
end
end
def resolve
result = case @role
when "master"
resolve_master
when "slave"
resolve_slave
else
raise ArgumentError, "Unknown instance role #{@role}"
end
result || (raise ConnectionError, "Unable to fetch #{@role} via Sentinel.")
end
def sentinel_detect
@sentinels.each do |sentinel|
client = Client.new(@options.merge({
:host => sentinel[:host],
:port => sentinel[:port],
:reconnect_attempts => 0,
}))
begin
if result = yield(client)
# This sentinel responded. Make sure we ask it first next time.
@sentinels.delete(sentinel)
@sentinels.unshift(sentinel)
return result
end
rescue BaseConnectionError
ensure
client.disconnect
end
end
raise CannotConnectError, "No sentinels available."
end
def resolve_master
sentinel_detect do |client|
if reply = client.call(["sentinel", "get-master-addr-by-name", @master])
{:host => reply[0], :port => reply[1]}
end
end
end
def resolve_slave
sentinel_detect do |client|
if reply = client.call(["sentinel", "slaves", @master])
slave = Hash[*reply.sample]
{:host => slave.fetch("ip"), :port => slave.fetch("port")}
end
end
end
end
end
end
end
require_relative "connection/registry"
# If a connection driver was required before this file, the array
# Redis::Connection.drivers will contain one or more classes. The last driver
# in this array will be used as default driver. If this array is empty, we load
# the plain Ruby driver as our default. Another driver can be required at a
# later point in time, causing it to be the last element of the #drivers array
# and therefore be chosen by default.
require_relative "connection/ruby" if Redis::Connection.drivers.empty?
class Redis
module Connection
module CommandHelper
COMMAND_DELIMITER = "\r\n"
def build_command(args)
command = [nil]
args.each do |i|
if i.is_a? Array
i.each do |j|
j = j.to_s
command << "$#{j.bytesize}"
command << j
end
else
i = i.to_s
command << "$#{i.bytesize}"
command << i
end
end
command[0] = "*#{(command.length - 1) / 2}"
# Trailing delimiter
command << ""
command.join(COMMAND_DELIMITER)
end
protected
if defined?(Encoding::default_external)
def encode(string)
string.force_encoding(Encoding::default_external)
end
else
def encode(string)
string
end
end
end
end
end
require_relative "registry"
require_relative "../errors"
require "hiredis/connection"
require "timeout"
class Redis
module Connection
class Hiredis
def self.connect(config)
connection = ::Hiredis::Connection.new
connect_timeout = (config.fetch(:connect_timeout, 0) * 1_000_000).to_i
if config[:scheme] == "unix"
connection.connect_unix(config[:path], connect_timeout)
elsif config[:scheme] == "rediss" || config[:ssl]
raise NotImplementedError, "SSL not supported by hiredis driver"
else
connection.connect(config[:host], config[:port], connect_timeout)
end
instance = new(connection)
instance.timeout = config[:read_timeout]
instance
rescue Errno::ETIMEDOUT
raise TimeoutError
end
def initialize(connection)
@connection = connection
end
def connected?
@connection && @connection.connected?
end
def timeout=(timeout)
# Hiredis works with microsecond timeouts
@connection.timeout = Integer(timeout * 1_000_000)
end
def disconnect
@connection.disconnect
@connection = nil
end
def write(command)
@connection.write(command.flatten(1))
rescue Errno::EAGAIN
raise TimeoutError
end
def read
reply = @connection.read
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
reply
rescue Errno::EAGAIN
raise TimeoutError
rescue RuntimeError => err
raise ProtocolError.new(err.message)
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Hiredis
class Redis
module Connection
# Store a list of loaded connection drivers in the Connection module.
# Redis::Client uses the last required driver by default, and will be aware
# of the loaded connection drivers if the user chooses to override the
# default connection driver.
def self.drivers
@drivers ||= []
end
end
end
require_relative "registry"
require_relative "command_helper"
require_relative "../errors"
require "socket"
require "timeout"
begin
require "openssl"
rescue LoadError
# Not all systems have OpenSSL support
end
class Redis
module Connection
module SocketMixin
CRLF = "\r\n".freeze
# Exceptions raised during non-blocking I/O ops that require retrying the op
NBIO_EXCEPTIONS = [Errno::EWOULDBLOCK, Errno::EAGAIN]
NBIO_EXCEPTIONS << IO::WaitReadable if RUBY_VERSION >= "1.9.3"
def initialize(*args)
super(*args)
@timeout = @write_timeout = nil
@buffer = ""
end
def timeout=(timeout)
if timeout && timeout > 0
@timeout = timeout
else
@timeout = nil
end
end
def write_timeout=(timeout)
if timeout && timeout > 0
@write_timeout = timeout
else
@write_timeout = nil
end
end
def read(nbytes)
result = @buffer.slice!(0, nbytes)
while result.bytesize < nbytes
result << _read_from_socket(nbytes - result.bytesize)
end
result
end
def gets
crlf = nil
while (crlf = @buffer.index(CRLF)) == nil
@buffer << _read_from_socket(1024)
end
@buffer.slice!(0, crlf + CRLF.bytesize)
end
def _read_from_socket(nbytes)
begin
read_nonblock(nbytes)
rescue *NBIO_EXCEPTIONS
if IO.select([self], nil, nil, @timeout)
retry
else
raise Redis::TimeoutError
end
end
rescue EOFError
raise Errno::ECONNRESET
end
# UNIXSocket and TCPSocket don't support write timeouts
def write(*args)
Timeout.timeout(@write_timeout, TimeoutError) { super }
end
end
if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
require "timeout"
class TCPSocket < ::TCPSocket
include SocketMixin
def self.connect(host, port, timeout)
Timeout.timeout(timeout) do
sock = new(host, port)
sock
end
rescue Timeout::Error
raise TimeoutError
end
end
if defined?(::UNIXSocket)
class UNIXSocket < ::UNIXSocket
include SocketMixin
def self.connect(path, timeout)
Timeout.timeout(timeout) do
sock = new(path)
sock
end
rescue Timeout::Error
raise TimeoutError
end
# JRuby raises Errno::EAGAIN on #read_nonblock even when IO.select
# says it is readable (1.6.6, in both 1.8 and 1.9 mode).
# Use the blocking #readpartial method instead.
def _read_from_socket(nbytes)
readpartial(nbytes)
rescue EOFError
raise Errno::ECONNRESET
end
end
end
else
class TCPSocket < ::Socket
include SocketMixin
def self.connect_addrinfo(ai, port, timeout)
sock = new(::Socket.const_get(ai[0]), Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.pack_sockaddr_in(port, ai[3])
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout) == nil
raise TimeoutError
end
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EISCONN
end
end
sock
end
def self.connect(host, port, timeout)
# Don't pass AI_ADDRCONFIG as flag to getaddrinfo(3)
#
# From the man page for getaddrinfo(3):
#
# If hints.ai_flags includes the AI_ADDRCONFIG flag, then IPv4
# addresses are returned in the list pointed to by res only if the
# local system has at least one IPv4 address configured, and IPv6
# addresses are returned only if the local system has at least one
# IPv6 address configured. The loopback address is not considered
# for this case as valid as a configured address.
#
# We do want the IPv6 loopback address to be returned if applicable,
# even if it is the only configured IPv6 address on the machine.
# Also see: https://github.com/redis/redis-rb/pull/394.
addrinfo = ::Socket.getaddrinfo(host, nil, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
# From the man page for getaddrinfo(3):
#
# Normally, the application should try using the addresses in the
# order in which they are returned. The sorting function used
# within getaddrinfo() is defined in RFC 3484 [...].
#
addrinfo.each_with_index do |ai, i|
begin
return connect_addrinfo(ai, port, timeout)
rescue SystemCallError
# Raise if this was our last attempt.
raise if addrinfo.length == i+1
end
end
end
end
class UNIXSocket < ::Socket
include SocketMixin
def self.connect(path, timeout)
sock = new(::Socket::AF_UNIX, Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.pack_sockaddr_un(path)
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout) == nil
raise TimeoutError
end
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EISCONN
end
end
sock
end
end
end
if defined?(OpenSSL)
class SSLSocket < ::OpenSSL::SSL::SSLSocket
include SocketMixin
def self.connect(host, port, timeout, ssl_params)
# Note: this is using Redis::Connection::TCPSocket
tcp_sock = TCPSocket.connect(host, port, timeout)
ctx = OpenSSL::SSL::SSLContext.new
ctx.set_params(ssl_params) if ssl_params && !ssl_params.empty?
ssl_sock = new(tcp_sock, ctx)
ssl_sock.hostname = host
ssl_sock.connect
ssl_sock.post_connection_check(host)
ssl_sock
end
end
end
class Ruby
include Redis::Connection::CommandHelper
MINUS = "-".freeze
PLUS = "+".freeze
COLON = ":".freeze
DOLLAR = "$".freeze
ASTERISK = "*".freeze
def self.connect(config)
if config[:scheme] == "unix"
raise ArgumentError, "SSL incompatible with unix sockets" if config[:ssl]
sock = UNIXSocket.connect(config[:path], config[:connect_timeout])
elsif config[:scheme] == "rediss" || config[:ssl]
sock = SSLSocket.connect(config[:host], config[:port], config[:connect_timeout], config[:ssl_params])
else
sock = TCPSocket.connect(config[:host], config[:port], config[:connect_timeout])
end
instance = new(sock)
instance.timeout = config[:timeout]
instance.write_timeout = config[:write_timeout]
instance.set_tcp_keepalive config[:tcp_keepalive]
instance
end
if [:SOL_SOCKET, :SO_KEEPALIVE, :SOL_TCP, :TCP_KEEPIDLE, :TCP_KEEPINTVL, :TCP_KEEPCNT].all?{|c| Socket.const_defined? c}
def set_tcp_keepalive(keepalive)
return unless keepalive.is_a?(Hash)
@sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, keepalive[:time])
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, keepalive[:intvl])
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, keepalive[:probes])
end
def get_tcp_keepalive
{
:time => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE).int,
:intvl => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL).int,
:probes => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT).int,
}
end
else
def set_tcp_keepalive(keepalive)
end
def get_tcp_keepalive
{
}
end
end
def initialize(sock)
@sock = sock
end
def connected?
!! @sock
end
def disconnect
@sock.close
rescue
ensure
@sock = nil
end
def timeout=(timeout)
if @sock.respond_to?(:timeout=)
@sock.timeout = timeout
end
end
def write_timeout=(timeout)
@sock.write_timeout = timeout
end
def write(command)
@sock.write(build_command(command))
end
def read
line = @sock.gets
reply_type = line.slice!(0, 1)
format_reply(reply_type, line)
rescue Errno::EAGAIN
raise TimeoutError
end
def format_reply(reply_type, line)
case reply_type
when MINUS then format_error_reply(line)
when PLUS then format_status_reply(line)
when COLON then format_integer_reply(line)
when DOLLAR then format_bulk_reply(line)
when ASTERISK then format_multi_bulk_reply(line)
else raise ProtocolError.new(reply_type)
end
end
def format_error_reply(line)
CommandError.new(line.strip)
end
def format_status_reply(line)
line.strip
end
def format_integer_reply(line)
line.to_i
end
def format_bulk_reply(line)
bulklen = line.to_i
return if bulklen == -1
reply = encode(@sock.read(bulklen))
@sock.read(2) # Discard CRLF.
reply
end
def format_multi_bulk_reply(line)
n = line.to_i
return if n == -1
Array.new(n) { read }
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Ruby
require_relative "command_helper"
require_relative "registry"
require_relative "../errors"
require "em-synchrony"
require "hiredis/reader"
class Redis
module Connection
class RedisClient < EventMachine::Connection
include EventMachine::Deferrable
attr_accessor :timeout
def post_init
@req = nil
@connected = false
@reader = ::Hiredis::Reader.new
end
def connection_completed
@connected = true
succeed
end
def connected?
@connected
end
def receive_data(data)
@reader.feed(data)
loop do
begin
reply = @reader.gets
rescue RuntimeError => err
@req.fail [:error, ProtocolError.new(err.message)]
break
end
break if reply == false
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
@req.succeed [:reply, reply]
end
end
def read
@req = EventMachine::DefaultDeferrable.new
if @timeout > 0
@req.timeout(@timeout, :timeout)
end
EventMachine::Synchrony.sync @req
end
def send(data)
callback { send_data data }
end
def unbind
@connected = false
if @req
@req.fail [:error, Errno::ECONNRESET]
@req = nil
else
fail
end
end
end
class Synchrony
include Redis::Connection::CommandHelper
def self.connect(config)
if config[:scheme] == "unix"
conn = EventMachine.connect_unix_domain(config[:path], RedisClient)
elsif config[:scheme] == "rediss" || config[:ssl]
raise NotImplementedError, "SSL not supported by synchrony driver"
else
conn = EventMachine.connect(config[:host], config[:port], RedisClient) do |c|
c.pending_connect_timeout = [config[:connect_timeout], 0.1].max
end
end
fiber = Fiber.current
conn.callback { fiber.resume }
conn.errback { fiber.resume :refused }
raise Errno::ECONNREFUSED if Fiber.yield == :refused
instance = new(conn)
instance.timeout = config[:read_timeout]
instance
end
def initialize(connection)
@connection = connection
end
def connected?
@connection && @connection.connected?
end
def timeout=(timeout)
@connection.timeout = timeout
end
def disconnect
@connection.close_connection
@connection = nil
end
def write(command)
@connection.send(build_command(command))
end
def read
type, payload = @connection.read
if type == :reply
payload
elsif type == :error
raise payload
elsif type == :timeout
raise TimeoutError
else
raise "Unknown type #{type.inspect}"
end
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Synchrony
This diff is collapsed.
class Redis
# Base error for all redis-rb errors.
class BaseError < RuntimeError
end
# Raised by the connection when a protocol error occurs.
class ProtocolError < BaseError
def initialize(reply_type)
super(<<-EOS.gsub(/(?:^|\n)\s*/, " "))
Got '#{reply_type}' as initial reply byte.
If you're in a forking environment, such as Unicorn, you need to
connect to Redis after forking.
EOS
end
end
# Raised by the client when command execution returns an error reply.
class CommandError < BaseError
end
# Base error for connection related errors.
class BaseConnectionError < BaseError
end
# Raised when connection to a Redis server cannot be made.
class CannotConnectError < BaseConnectionError
end
# Raised when connection to a Redis server is lost.
class ConnectionError < BaseConnectionError
end
# Raised when performing I/O times out.
class TimeoutError < BaseConnectionError
end
# Raised when the connection was inherited by a child process.
class InheritedError < BaseConnectionError
end
end
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
class Redis
VERSION = "3.3.0"
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment