As we reevaluate how to best support and maintain Staging Ref in the future, we encourage development teams using this environment to highlight their use cases in the following issue: https://gitlab.com/gitlab-com/gl-infra/software-delivery/framework/software-delivery-framework-issue-tracker/-/issues/36.

Skip to content
Snippets Groups Projects
Commit 58ceab72 authored by Douwe Maan's avatar Douwe Maan
Browse files

Merge branch '119-remove-gitlab-reference-counter' into 'master'

Remove direct redis integration

Closes #119

See merge request gitlab-org/gitlab-shell!181
parents fa2b35a7 d1c01fe8
No related branches found
No related tags found
No related merge requests found
Showing
with 4 additions and 5297 deletions
REDIS_RB_VERSION=v3.3.3
REDIS_RB_VENDOR_DIR=lib/vendor/redis
PWD=`pwd`
all:
update-redis:
rm -rf $(REDIS_RB_VENDOR_DIR)
git clone -b $(REDIS_RB_VERSION) https://github.com/redis/redis-rb.git $(REDIS_RB_VENDOR_DIR)
rm -rf $(REDIS_RB_VENDOR_DIR)/.git
.PHONY=update-redis
Loading
Loading
@@ -3,17 +3,6 @@
require_relative '../lib/gitlab_init'
require_relative '../lib/gitlab_net'
def ping_redis
print "Send ping to redis server: "
if GitlabNet.new.redis_client.ping
print 'OK'
else
abort 'FAILED'
end
puts "\n"
end
#
# GitLab shell check task
#
Loading
Loading
@@ -30,15 +19,11 @@ begin
check_values = JSON.parse(resp.body)
if check_values.key?('redis')
print 'Redis available via internal API: '
if check_values['redis']
puts 'OK'
else
abort 'FAILED'
end
print 'Redis available via internal API: '
if check_values['redis']
puts 'OK'
else
ping_redis
abort 'FAILED'
end
rescue GitlabNet::ApiUnreachableError
abort "FAILED: Failed to connect to internal API"
Loading
Loading
Loading
Loading
@@ -35,23 +35,6 @@ auth_file: "/home/git/.ssh/authorized_keys"
# Default is hooks in the gitlab-shell directory.
# custom_hooks_dir: "/home/git/gitlab-shell/hooks"
# Redis settings used for pushing commit notices to gitlab
redis:
# host: 127.0.0.1
# port: 6379
# pass: redispass # Allows you to specify the password for Redis
database: 0
socket: /var/run/redis/redis.sock # Comment out this line if you want to use TCP or Sentinel
namespace: resque:gitlab
# sentinels:
# -
# host: 127.0.0.1
# port: 26380
# -
# host: 127.0.0.1
# port: 26381
# Log file.
# Default is gitlab-shell.log in the root directory.
# log_file: "/home/git/gitlab-shell/gitlab-shell.log"
Loading
Loading
Loading
Loading
@@ -13,8 +13,6 @@ def increase_reference_counter(gl_repository, repo_path)
result = GitlabNet.new.pre_receive(gl_repository)
result['reference_counter_increased']
rescue GitlabNet::NotFound
GitlabReferenceCounter.new(repo_path).increase
end
require_relative '../lib/gitlab_custom_hook'
Loading
Loading
Loading
Loading
@@ -34,14 +34,6 @@ class GitlabConfig
@config['http_settings'] ||= {}
end
def redis
@config['redis'] ||= {}
end
def redis_namespace
redis['namespace'] || 'resque:gitlab'
end
def log_file
@config['log_file'] ||= File.join(ROOT_PATH, 'gitlab-shell.log')
end
Loading
Loading
Loading
Loading
@@ -5,7 +5,6 @@ require 'json'
require_relative 'gitlab_config'
require_relative 'gitlab_logger'
require_relative 'gitlab_access'
require_relative 'gitlab_redis'
require_relative 'gitlab_lfs_authentication'
require_relative 'httpunix'
Loading
Loading
@@ -140,30 +139,6 @@ class GitlabNet
JSON.parse(resp.body) if resp.code == '200'
end
def redis_client
redis_config = config.redis
database = redis_config['database'] || 0
params = {
host: redis_config['host'] || '127.0.0.1',
port: redis_config['port'] || 6379,
db: database
}
if redis_config.has_key?('sentinels')
params[:sentinels] = redis_config['sentinels']
.select { |s| s['host'] && s['port'] }
.map { |s| { host: s['host'], port: s['port'] } }
end
if redis_config.has_key?("socket")
params = { path: redis_config['socket'], db: database }
elsif redis_config.has_key?("pass")
params[:password] = redis_config['pass']
end
Redis.new(params)
end
protected
def sanitize_path(repo)
Loading
Loading
require_relative 'gitlab_init'
require_relative 'gitlab_net'
require_relative 'gitlab_reference_counter'
require_relative 'gitlab_metrics'
require 'json'
require 'base64'
Loading
Loading
@@ -32,8 +31,6 @@ class GitlabPostReceive
response['reference_counter_decreased']
rescue GitlabNet::ApiUnreachableError
false
rescue GitlabNet::NotFound
fallback_post_receive
end
protected
Loading
Loading
@@ -95,55 +92,4 @@ class GitlabPostReceive
puts
puts "=" * total_width
end
def update_redis
# Encode changes as base64 so we don't run into trouble with non-UTF-8 input.
changes = Base64.encode64(@changes)
# TODO: Change to `@gl_repository` in next release.
# See https://gitlab.com/gitlab-org/gitlab-shell/merge_requests/130#note_28747613
project_identifier = @gl_repository || @repo_path
queue = "#{config.redis_namespace}:queue:post_receive"
msg = JSON.dump({
'class' => 'PostReceive',
'args' => [project_identifier, @actor, changes],
'jid' => @jid,
'enqueued_at' => Time.now.to_f
})
begin
GitlabNet.new.redis_client.rpush(queue, msg)
true
rescue => e
$stderr.puts "GitLab: An unexpected error occurred in writing to Redis: #{e}"
false
end
end
private
def fallback_post_receive
result = update_redis
begin
broadcast_message = GitlabMetrics.measure("broadcast-message") do
api.broadcast_message
end
if broadcast_message.has_key?("message")
print_broadcast_message(broadcast_message["message"])
end
merge_request_urls = GitlabMetrics.measure("merge-request-urls") do
api.merge_request_urls(@gl_repository, @repo_path, @changes)
end
print_merge_request_links(merge_request_urls)
api.notify_post_receive(gl_repository, repo_path)
rescue GitlabNet::ApiUnreachableError
nil
end
result && GitlabReferenceCounter.new(repo_path).decrease
end
end
$:.unshift(File.expand_path(File.join(File.dirname(__FILE__), 'vendor/redis/lib')))
require 'redis'
require_relative 'gitlab_init'
require_relative 'gitlab_net'
class GitlabReferenceCounter
REFERENCE_EXPIRE_TIME = 600
attr_reader :path, :key
def initialize(path)
@path = path
@key = "git-receive-pack-reference-counter:#{path}"
end
def value
(redis_client.get(key) || 0).to_i
end
def increase
redis_cmd do
redis_client.incr(key)
redis_client.expire(key, REFERENCE_EXPIRE_TIME)
end
end
def decrease
redis_cmd do
current_value = redis_client.decr(key)
if current_value < 0
$logger.warn "Reference counter for #{path} decreased when its value was less than 1. Reseting the counter."
redis_client.del(key)
end
end
end
private
def redis_client
@redis_client ||= GitlabNet.new.redis_client
end
def redis_cmd
begin
yield
true
rescue => e
message = "GitLab: An unexpected error occurred in writing to Redis: #{e}"
$stderr.puts message
$logger.error message
false
end
end
end
This diff is collapsed.
require "redis/errors"
require "socket"
require "cgi"
class Redis
class Client
DEFAULTS = {
:url => lambda { ENV["REDIS_URL"] },
:scheme => "redis",
:host => "127.0.0.1",
:port => 6379,
:path => nil,
:timeout => 5.0,
:password => nil,
:db => 0,
:driver => nil,
:id => nil,
:tcp_keepalive => 0,
:reconnect_attempts => 1,
:inherit_socket => false
}
def options
Marshal.load(Marshal.dump(@options))
end
def scheme
@options[:scheme]
end
def host
@options[:host]
end
def port
@options[:port]
end
def path
@options[:path]
end
def read_timeout
@options[:read_timeout]
end
def connect_timeout
@options[:connect_timeout]
end
def timeout
@options[:read_timeout]
end
def password
@options[:password]
end
def db
@options[:db]
end
def db=(db)
@options[:db] = db.to_i
end
def driver
@options[:driver]
end
def inherit_socket?
@options[:inherit_socket]
end
attr_accessor :logger
attr_reader :connection
attr_reader :command_map
def initialize(options = {})
@options = _parse_options(options)
@reconnect = true
@logger = @options[:logger]
@connection = nil
@command_map = {}
@pending_reads = 0
if options.include?(:sentinels)
@connector = Connector::Sentinel.new(@options)
else
@connector = Connector.new(@options)
end
end
def connect
@pid = Process.pid
# Don't try to reconnect when the connection is fresh
with_reconnect(false) do
establish_connection
call [:auth, password] if password
call [:select, db] if db != 0
call [:client, :setname, @options[:id]] if @options[:id]
@connector.check(self)
end
self
end
def id
@options[:id] || "redis://#{location}/#{db}"
end
def location
path || "#{host}:#{port}"
end
def call(command)
reply = process([command]) { read }
raise reply if reply.is_a?(CommandError)
if block_given?
yield reply
else
reply
end
end
def call_loop(command, timeout = 0)
error = nil
result = with_socket_timeout(timeout) do
process([command]) do
loop do
reply = read
if reply.is_a?(CommandError)
error = reply
break
else
yield reply
end
end
end
end
# Raise error when previous block broke out of the loop.
raise error if error
# Result is set to the value that the provided block used to break.
result
end
def call_pipeline(pipeline)
with_reconnect pipeline.with_reconnect? do
begin
pipeline.finish(call_pipelined(pipeline.commands)).tap do
self.db = pipeline.db if pipeline.db
end
rescue ConnectionError => e
return nil if pipeline.shutdown?
# Assume the pipeline was sent in one piece, but execution of
# SHUTDOWN caused none of the replies for commands that were executed
# prior to it from coming back around.
raise e
end
end
end
def call_pipelined(commands)
return [] if commands.empty?
# The method #ensure_connected (called from #process) reconnects once on
# I/O errors. To make an effort in making sure that commands are not
# executed more than once, only allow reconnection before the first reply
# has been read. When an error occurs after the first reply has been
# read, retrying would re-execute the entire pipeline, thus re-issuing
# already successfully executed commands. To circumvent this, don't retry
# after the first reply has been read successfully.
result = Array.new(commands.size)
reconnect = @reconnect
begin
exception = nil
process(commands) do
result[0] = read
@reconnect = false
(commands.size - 1).times do |i|
reply = read
result[i + 1] = reply
exception = reply if exception.nil? && reply.is_a?(CommandError)
end
end
raise exception if exception
ensure
@reconnect = reconnect
end
result
end
def call_with_timeout(command, timeout, &blk)
with_socket_timeout(timeout) do
call(command, &blk)
end
rescue ConnectionError
retry
end
def call_without_timeout(command, &blk)
call_with_timeout(command, 0, &blk)
end
def process(commands)
logging(commands) do
ensure_connected do
commands.each do |command|
if command_map[command.first]
command = command.dup
command[0] = command_map[command.first]
end
write(command)
end
yield if block_given?
end
end
end
def connected?
!! (connection && connection.connected?)
end
def disconnect
connection.disconnect if connected?
end
def reconnect
disconnect
connect
end
def io
yield
rescue TimeoutError => e1
# Add a message to the exception without destroying the original stack
e2 = TimeoutError.new("Connection timed out")
e2.set_backtrace(e1.backtrace)
raise e2
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL => e
raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last]
end
def read
io do
value = connection.read
@pending_reads -= 1
value
end
end
def write(command)
io do
@pending_reads += 1
connection.write(command)
end
end
def with_socket_timeout(timeout)
connect unless connected?
begin
connection.timeout = timeout
yield
ensure
connection.timeout = self.timeout if connected?
end
end
def without_socket_timeout(&blk)
with_socket_timeout(0, &blk)
end
def with_reconnect(val=true)
begin
original, @reconnect = @reconnect, val
yield
ensure
@reconnect = original
end
end
def without_reconnect(&blk)
with_reconnect(false, &blk)
end
protected
def logging(commands)
return yield unless @logger && @logger.debug?
begin
commands.each do |name, *args|
logged_args = args.map do |a|
case
when a.respond_to?(:inspect) then a.inspect
when a.respond_to?(:to_s) then a.to_s
else
# handle poorly-behaved descendants of BasicObject
klass = a.instance_exec { (class << self; self end).superclass }
"\#<#{klass}:#{a.__id__}>"
end
end
@logger.debug("[Redis] command=#{name.to_s.upcase} args=#{logged_args.join(' ')}")
end
t1 = Time.now
yield
ensure
@logger.debug("[Redis] call_time=%0.2f ms" % ((Time.now - t1) * 1000)) if t1
end
end
def establish_connection
server = @connector.resolve.dup
@options[:host] = server[:host]
@options[:port] = Integer(server[:port]) if server.include?(:port)
@connection = @options[:driver].connect(@options)
@pending_reads = 0
rescue TimeoutError,
Errno::ECONNREFUSED,
Errno::EHOSTDOWN,
Errno::EHOSTUNREACH,
Errno::ENETUNREACH,
Errno::ETIMEDOUT
raise CannotConnectError, "Error connecting to Redis on #{location} (#{$!.class})"
end
def ensure_connected
disconnect if @pending_reads > 0
attempts = 0
begin
attempts += 1
if connected?
unless inherit_socket? || Process.pid == @pid
raise InheritedError,
"Tried to use a connection from a child process without reconnecting. " +
"You need to reconnect to Redis after forking " +
"or set :inherit_socket to true."
end
else
connect
end
yield
rescue BaseConnectionError
disconnect
if attempts <= @options[:reconnect_attempts] && @reconnect
retry
else
raise
end
rescue Exception
disconnect
raise
end
end
def _parse_options(options)
return options if options[:_parsed]
defaults = DEFAULTS.dup
options = options.dup
defaults.keys.each do |key|
# Fill in defaults if needed
if defaults[key].respond_to?(:call)
defaults[key] = defaults[key].call
end
# Symbolize only keys that are needed
options[key] = options[key.to_s] if options.has_key?(key.to_s)
end
url = options[:url] || defaults[:url]
# Override defaults from URL if given
if url
require "uri"
uri = URI(url)
if uri.scheme == "unix"
defaults[:path] = uri.path
elsif uri.scheme == "redis" || uri.scheme == "rediss"
defaults[:scheme] = uri.scheme
defaults[:host] = uri.host if uri.host
defaults[:port] = uri.port if uri.port
defaults[:password] = CGI.unescape(uri.password) if uri.password
defaults[:db] = uri.path[1..-1].to_i if uri.path
defaults[:role] = :master
else
raise ArgumentError, "invalid uri scheme '#{uri.scheme}'"
end
defaults[:ssl] = true if uri.scheme == "rediss"
end
# Use default when option is not specified or nil
defaults.keys.each do |key|
options[key] = defaults[key] if options[key].nil?
end
if options[:path]
# Unix socket
options[:scheme] = "unix"
options.delete(:host)
options.delete(:port)
else
# TCP socket
options[:host] = options[:host].to_s
options[:port] = options[:port].to_i
end
if options.has_key?(:timeout)
options[:connect_timeout] ||= options[:timeout]
options[:read_timeout] ||= options[:timeout]
options[:write_timeout] ||= options[:timeout]
end
options[:connect_timeout] = Float(options[:connect_timeout])
options[:read_timeout] = Float(options[:read_timeout])
options[:write_timeout] = Float(options[:write_timeout])
options[:db] = options[:db].to_i
options[:driver] = _parse_driver(options[:driver]) || Connection.drivers.last
case options[:tcp_keepalive]
when Hash
[:time, :intvl, :probes].each do |key|
unless options[:tcp_keepalive][key].is_a?(Integer)
raise "Expected the #{key.inspect} key in :tcp_keepalive to be an Integer"
end
end
when Integer
if options[:tcp_keepalive] >= 60
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 20, :intvl => 10, :probes => 2}
elsif options[:tcp_keepalive] >= 30
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 10, :intvl => 5, :probes => 2}
elsif options[:tcp_keepalive] >= 5
options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 2, :intvl => 2, :probes => 1}
end
end
options[:_parsed] = true
options
end
def _parse_driver(driver)
driver = driver.to_s if driver.is_a?(Symbol)
if driver.kind_of?(String)
begin
require "redis/connection/#{driver}"
driver = Connection.const_get(driver.capitalize)
rescue LoadError, NameError
raise RuntimeError, "Cannot load driver #{driver.inspect}"
end
end
driver
end
class Connector
def initialize(options)
@options = options.dup
end
def resolve
@options
end
def check(client)
end
class Sentinel < Connector
def initialize(options)
super(options)
@options[:password] = DEFAULTS.fetch(:password)
@options[:db] = DEFAULTS.fetch(:db)
@sentinels = @options.delete(:sentinels).dup
@role = @options.fetch(:role, "master").to_s
@master = @options[:host]
end
def check(client)
# Check the instance is really of the role we are looking for.
# We can't assume the command is supported since it was introduced
# recently and this client should work with old stuff.
begin
role = client.call([:role])[0]
rescue Redis::CommandError
# Assume the test is passed if we can't get a reply from ROLE...
role = @role
end
if role != @role
client.disconnect
raise ConnectionError, "Instance role mismatch. Expected #{@role}, got #{role}."
end
end
def resolve
result = case @role
when "master"
resolve_master
when "slave"
resolve_slave
else
raise ArgumentError, "Unknown instance role #{@role}"
end
result || (raise ConnectionError, "Unable to fetch #{@role} via Sentinel.")
end
def sentinel_detect
@sentinels.each do |sentinel|
client = Client.new(@options.merge({
:host => sentinel[:host],
:port => sentinel[:port],
:reconnect_attempts => 0,
}))
begin
if result = yield(client)
# This sentinel responded. Make sure we ask it first next time.
@sentinels.delete(sentinel)
@sentinels.unshift(sentinel)
return result
end
rescue BaseConnectionError
ensure
client.disconnect
end
end
raise CannotConnectError, "No sentinels available."
end
def resolve_master
sentinel_detect do |client|
if reply = client.call(["sentinel", "get-master-addr-by-name", @master])
{:host => reply[0], :port => reply[1]}
end
end
end
def resolve_slave
sentinel_detect do |client|
if reply = client.call(["sentinel", "slaves", @master])
slave = Hash[*reply.sample]
{:host => slave.fetch("ip"), :port => slave.fetch("port")}
end
end
end
end
end
end
end
require "redis/connection/registry"
# If a connection driver was required before this file, the array
# Redis::Connection.drivers will contain one or more classes. The last driver
# in this array will be used as default driver. If this array is empty, we load
# the plain Ruby driver as our default. Another driver can be required at a
# later point in time, causing it to be the last element of the #drivers array
# and therefore be chosen by default.
require "redis/connection/ruby" if Redis::Connection.drivers.empty?
\ No newline at end of file
class Redis
module Connection
module CommandHelper
COMMAND_DELIMITER = "\r\n"
def build_command(args)
command = [nil]
args.each do |i|
if i.is_a? Array
i.each do |j|
j = j.to_s
command << "$#{j.bytesize}"
command << j
end
else
i = i.to_s
command << "$#{i.bytesize}"
command << i
end
end
command[0] = "*#{(command.length - 1) / 2}"
# Trailing delimiter
command << ""
command.join(COMMAND_DELIMITER)
end
protected
if defined?(Encoding::default_external)
def encode(string)
string.force_encoding(Encoding::default_external)
end
else
def encode(string)
string
end
end
end
end
end
require "redis/connection/registry"
require "redis/errors"
require "hiredis/connection"
require "timeout"
class Redis
module Connection
class Hiredis
def self.connect(config)
connection = ::Hiredis::Connection.new
connect_timeout = (config.fetch(:connect_timeout, 0) * 1_000_000).to_i
if config[:scheme] == "unix"
connection.connect_unix(config[:path], connect_timeout)
elsif config[:scheme] == "rediss" || config[:ssl]
raise NotImplementedError, "SSL not supported by hiredis driver"
else
connection.connect(config[:host], config[:port], connect_timeout)
end
instance = new(connection)
instance.timeout = config[:read_timeout]
instance
rescue Errno::ETIMEDOUT
raise TimeoutError
end
def initialize(connection)
@connection = connection
end
def connected?
@connection && @connection.connected?
end
def timeout=(timeout)
# Hiredis works with microsecond timeouts
@connection.timeout = Integer(timeout * 1_000_000)
end
def disconnect
@connection.disconnect
@connection = nil
end
def write(command)
@connection.write(command.flatten(1))
rescue Errno::EAGAIN
raise TimeoutError
end
def read
reply = @connection.read
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
reply
rescue Errno::EAGAIN
raise TimeoutError
rescue RuntimeError => err
raise ProtocolError.new(err.message)
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Hiredis
class Redis
module Connection
# Store a list of loaded connection drivers in the Connection module.
# Redis::Client uses the last required driver by default, and will be aware
# of the loaded connection drivers if the user chooses to override the
# default connection driver.
def self.drivers
@drivers ||= []
end
end
end
require "redis/connection/registry"
require "redis/connection/command_helper"
require "redis/errors"
require "socket"
require "timeout"
begin
require "openssl"
rescue LoadError
# Not all systems have OpenSSL support
end
if RUBY_VERSION < "1.9.3"
class String
# Ruby 1.8.7 does not have byteslice, but it handles encodings differently anyway.
# We can simply slice the string, which is a byte array there.
def byteslice(*args)
slice(*args)
end
end
end
class Redis
module Connection
module SocketMixin
CRLF = "\r\n".freeze
# Exceptions raised during non-blocking I/O ops that require retrying the op
if RUBY_VERSION >= "1.9.3"
NBIO_READ_EXCEPTIONS = [IO::WaitReadable]
NBIO_WRITE_EXCEPTIONS = [IO::WaitWritable]
else
NBIO_READ_EXCEPTIONS = [Errno::EWOULDBLOCK, Errno::EAGAIN]
NBIO_WRITE_EXCEPTIONS = [Errno::EWOULDBLOCK, Errno::EAGAIN]
end
def initialize(*args)
super(*args)
@timeout = @write_timeout = nil
@buffer = ""
end
def timeout=(timeout)
if timeout && timeout > 0
@timeout = timeout
else
@timeout = nil
end
end
def write_timeout=(timeout)
if timeout && timeout > 0
@write_timeout = timeout
else
@write_timeout = nil
end
end
def read(nbytes)
result = @buffer.slice!(0, nbytes)
while result.bytesize < nbytes
result << _read_from_socket(nbytes - result.bytesize)
end
result
end
def gets
crlf = nil
while (crlf = @buffer.index(CRLF)) == nil
@buffer << _read_from_socket(1024)
end
@buffer.slice!(0, crlf + CRLF.bytesize)
end
def _read_from_socket(nbytes)
begin
read_nonblock(nbytes)
rescue *NBIO_READ_EXCEPTIONS
if IO.select([self], nil, nil, @timeout)
retry
else
raise Redis::TimeoutError
end
rescue *NBIO_WRITE_EXCEPTIONS
if IO.select(nil, [self], nil, @timeout)
retry
else
raise Redis::TimeoutError
end
end
rescue EOFError
raise Errno::ECONNRESET
end
def _write_to_socket(data)
begin
write_nonblock(data)
rescue *NBIO_WRITE_EXCEPTIONS
if IO.select(nil, [self], nil, @write_timeout)
retry
else
raise Redis::TimeoutError
end
rescue *NBIO_READ_EXCEPTIONS
if IO.select([self], nil, nil, @write_timeout)
retry
else
raise Redis::TimeoutError
end
end
rescue EOFError
raise Errno::ECONNRESET
end
def write(data)
return super(data) unless @write_timeout
length = data.bytesize
total_count = 0
loop do
count = _write_to_socket(data)
total_count += count
return total_count if total_count >= length
data = data.byteslice(count..-1)
end
end
end
if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
require "timeout"
class TCPSocket < ::TCPSocket
include SocketMixin
def self.connect(host, port, timeout)
Timeout.timeout(timeout) do
sock = new(host, port)
sock
end
rescue Timeout::Error
raise TimeoutError
end
end
if defined?(::UNIXSocket)
class UNIXSocket < ::UNIXSocket
include SocketMixin
def self.connect(path, timeout)
Timeout.timeout(timeout) do
sock = new(path)
sock
end
rescue Timeout::Error
raise TimeoutError
end
# JRuby raises Errno::EAGAIN on #read_nonblock even when IO.select
# says it is readable (1.6.6, in both 1.8 and 1.9 mode).
# Use the blocking #readpartial method instead.
def _read_from_socket(nbytes)
readpartial(nbytes)
rescue EOFError
raise Errno::ECONNRESET
end
end
end
else
class TCPSocket < ::Socket
include SocketMixin
def self.connect_addrinfo(ai, port, timeout)
sock = new(::Socket.const_get(ai[0]), Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.pack_sockaddr_in(port, ai[3])
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout) == nil
raise TimeoutError
end
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EISCONN
end
end
sock
end
def self.connect(host, port, timeout)
# Don't pass AI_ADDRCONFIG as flag to getaddrinfo(3)
#
# From the man page for getaddrinfo(3):
#
# If hints.ai_flags includes the AI_ADDRCONFIG flag, then IPv4
# addresses are returned in the list pointed to by res only if the
# local system has at least one IPv4 address configured, and IPv6
# addresses are returned only if the local system has at least one
# IPv6 address configured. The loopback address is not considered
# for this case as valid as a configured address.
#
# We do want the IPv6 loopback address to be returned if applicable,
# even if it is the only configured IPv6 address on the machine.
# Also see: https://github.com/redis/redis-rb/pull/394.
addrinfo = ::Socket.getaddrinfo(host, nil, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
# From the man page for getaddrinfo(3):
#
# Normally, the application should try using the addresses in the
# order in which they are returned. The sorting function used
# within getaddrinfo() is defined in RFC 3484 [...].
#
addrinfo.each_with_index do |ai, i|
begin
return connect_addrinfo(ai, port, timeout)
rescue SystemCallError
# Raise if this was our last attempt.
raise if addrinfo.length == i+1
end
end
end
end
class UNIXSocket < ::Socket
include SocketMixin
def self.connect(path, timeout)
sock = new(::Socket::AF_UNIX, Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.pack_sockaddr_un(path)
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout) == nil
raise TimeoutError
end
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EISCONN
end
end
sock
end
end
end
if defined?(OpenSSL)
class SSLSocket < ::OpenSSL::SSL::SSLSocket
include SocketMixin
def self.connect(host, port, timeout, ssl_params)
# Note: this is using Redis::Connection::TCPSocket
tcp_sock = TCPSocket.connect(host, port, timeout)
ctx = OpenSSL::SSL::SSLContext.new
ctx.set_params(ssl_params) if ssl_params && !ssl_params.empty?
ssl_sock = new(tcp_sock, ctx)
ssl_sock.hostname = host
ssl_sock.connect
ssl_sock.post_connection_check(host)
ssl_sock
end
end
end
class Ruby
include Redis::Connection::CommandHelper
MINUS = "-".freeze
PLUS = "+".freeze
COLON = ":".freeze
DOLLAR = "$".freeze
ASTERISK = "*".freeze
def self.connect(config)
if config[:scheme] == "unix"
raise ArgumentError, "SSL incompatible with unix sockets" if config[:ssl]
sock = UNIXSocket.connect(config[:path], config[:connect_timeout])
elsif config[:scheme] == "rediss" || config[:ssl]
raise ArgumentError, "This library does not support SSL on Ruby < 1.9" if RUBY_VERSION < "1.9.3"
sock = SSLSocket.connect(config[:host], config[:port], config[:connect_timeout], config[:ssl_params])
else
sock = TCPSocket.connect(config[:host], config[:port], config[:connect_timeout])
end
instance = new(sock)
instance.timeout = config[:timeout]
instance.write_timeout = config[:write_timeout]
instance.set_tcp_keepalive config[:tcp_keepalive]
instance
end
if [:SOL_SOCKET, :SO_KEEPALIVE, :SOL_TCP, :TCP_KEEPIDLE, :TCP_KEEPINTVL, :TCP_KEEPCNT].all?{|c| Socket.const_defined? c}
def set_tcp_keepalive(keepalive)
return unless keepalive.is_a?(Hash)
@sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, keepalive[:time])
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, keepalive[:intvl])
@sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, keepalive[:probes])
end
def get_tcp_keepalive
{
:time => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE).int,
:intvl => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL).int,
:probes => @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT).int,
}
end
else
def set_tcp_keepalive(keepalive)
end
def get_tcp_keepalive
{
}
end
end
def initialize(sock)
@sock = sock
end
def connected?
!! @sock
end
def disconnect
@sock.close
rescue
ensure
@sock = nil
end
def timeout=(timeout)
if @sock.respond_to?(:timeout=)
@sock.timeout = timeout
end
end
def write_timeout=(timeout)
@sock.write_timeout = timeout
end
def write(command)
@sock.write(build_command(command))
end
def read
line = @sock.gets
reply_type = line.slice!(0, 1)
format_reply(reply_type, line)
rescue Errno::EAGAIN
raise TimeoutError
end
def format_reply(reply_type, line)
case reply_type
when MINUS then format_error_reply(line)
when PLUS then format_status_reply(line)
when COLON then format_integer_reply(line)
when DOLLAR then format_bulk_reply(line)
when ASTERISK then format_multi_bulk_reply(line)
else raise ProtocolError.new(reply_type)
end
end
def format_error_reply(line)
CommandError.new(line.strip)
end
def format_status_reply(line)
line.strip
end
def format_integer_reply(line)
line.to_i
end
def format_bulk_reply(line)
bulklen = line.to_i
return if bulklen == -1
reply = encode(@sock.read(bulklen))
@sock.read(2) # Discard CRLF.
reply
end
def format_multi_bulk_reply(line)
n = line.to_i
return if n == -1
Array.new(n) { read }
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Ruby
require "redis/connection/command_helper"
require "redis/connection/registry"
require "redis/errors"
require "em-synchrony"
require "hiredis/reader"
class Redis
module Connection
class RedisClient < EventMachine::Connection
include EventMachine::Deferrable
attr_accessor :timeout
def post_init
@req = nil
@connected = false
@reader = ::Hiredis::Reader.new
end
def connection_completed
@connected = true
succeed
end
def connected?
@connected
end
def receive_data(data)
@reader.feed(data)
loop do
begin
reply = @reader.gets
rescue RuntimeError => err
@req.fail [:error, ProtocolError.new(err.message)]
break
end
break if reply == false
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
@req.succeed [:reply, reply]
end
end
def read
@req = EventMachine::DefaultDeferrable.new
if @timeout > 0
@req.timeout(@timeout, :timeout)
end
EventMachine::Synchrony.sync @req
end
def send(data)
callback { send_data data }
end
def unbind
@connected = false
if @req
@req.fail [:error, Errno::ECONNRESET]
@req = nil
else
fail
end
end
end
class Synchrony
include Redis::Connection::CommandHelper
def self.connect(config)
if config[:scheme] == "unix"
conn = EventMachine.connect_unix_domain(config[:path], RedisClient)
elsif config[:scheme] == "rediss" || config[:ssl]
raise NotImplementedError, "SSL not supported by synchrony driver"
else
conn = EventMachine.connect(config[:host], config[:port], RedisClient) do |c|
c.pending_connect_timeout = [config[:connect_timeout], 0.1].max
end
end
fiber = Fiber.current
conn.callback { fiber.resume }
conn.errback { fiber.resume :refused }
raise Errno::ECONNREFUSED if Fiber.yield == :refused
instance = new(conn)
instance.timeout = config[:read_timeout]
instance
end
def initialize(connection)
@connection = connection
end
def connected?
@connection && @connection.connected?
end
def timeout=(timeout)
@connection.timeout = timeout
end
def disconnect
@connection.close_connection
@connection = nil
end
def write(command)
@connection.send(build_command(command))
end
def read
type, payload = @connection.read
if type == :reply
payload
elsif type == :error
raise payload
elsif type == :timeout
raise TimeoutError
else
raise "Unknown type #{type.inspect}"
end
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Synchrony
This diff is collapsed.
class Redis
# Base error for all redis-rb errors.
class BaseError < RuntimeError
end
# Raised by the connection when a protocol error occurs.
class ProtocolError < BaseError
def initialize(reply_type)
super(<<-EOS.gsub(/(?:^|\n)\s*/, " "))
Got '#{reply_type}' as initial reply byte.
If you're in a forking environment, such as Unicorn, you need to
connect to Redis after forking.
EOS
end
end
# Raised by the client when command execution returns an error reply.
class CommandError < BaseError
end
# Base error for connection related errors.
class BaseConnectionError < BaseError
end
# Raised when connection to a Redis server cannot be made.
class CannotConnectError < BaseConnectionError
end
# Raised when connection to a Redis server is lost.
class ConnectionError < BaseConnectionError
end
# Raised when performing I/O times out.
class TimeoutError < BaseConnectionError
end
# Raised when the connection was inherited by a child process.
class InheritedError < BaseConnectionError
end
end
require 'zlib'
class Redis
class HashRing
POINTS_PER_SERVER = 160 # this is the default in libmemcached
attr_reader :ring, :sorted_keys, :replicas, :nodes
# nodes is a list of objects that have a proper to_s representation.
# replicas indicates how many virtual points should be used pr. node,
# replicas are required to improve the distribution.
def initialize(nodes=[], replicas=POINTS_PER_SERVER)
@replicas = replicas
@ring = {}
@nodes = []
@sorted_keys = []
nodes.each do |node|
add_node(node)
end
end
# Adds a `node` to the hash ring (including a number of replicas).
def add_node(node)
@nodes << node
@replicas.times do |i|
key = Zlib.crc32("#{node.id}:#{i}")
raise "Node ID collision" if @ring.has_key?(key)
@ring[key] = node
@sorted_keys << key
end
@sorted_keys.sort!
end
def remove_node(node)
@nodes.reject!{|n| n.id == node.id}
@replicas.times do |i|
key = Zlib.crc32("#{node.id}:#{i}")
@ring.delete(key)
@sorted_keys.reject! {|k| k == key}
end
end
# get the node in the hash ring for this key
def get_node(key)
get_node_pos(key)[0]
end
def get_node_pos(key)
return [nil,nil] if @ring.size == 0
crc = Zlib.crc32(key)
idx = HashRing.binary_search(@sorted_keys, crc)
return [@ring[@sorted_keys[idx]], idx]
end
def iter_nodes(key)
return [nil,nil] if @ring.size == 0
_, pos = get_node_pos(key)
@ring.size.times do |n|
yield @ring[@sorted_keys[(pos+n) % @ring.size]]
end
end
class << self
# gem install RubyInline to use this code
# Native extension to perform the binary search within the hashring.
# There's a pure ruby version below so this is purely optional
# for performance. In testing 20k gets and sets, the native
# binary search shaved about 12% off the runtime (9sec -> 8sec).
begin
require 'inline'
inline do |builder|
builder.c <<-EOM
int binary_search(VALUE ary, unsigned int r) {
int upper = RARRAY_LEN(ary) - 1;
int lower = 0;
int idx = 0;
while (lower <= upper) {
idx = (lower + upper) / 2;
VALUE continuumValue = RARRAY_PTR(ary)[idx];
unsigned int l = NUM2UINT(continuumValue);
if (l == r) {
return idx;
}
else if (l > r) {
upper = idx - 1;
}
else {
lower = idx + 1;
}
}
if (upper < 0) {
upper = RARRAY_LEN(ary) - 1;
}
return upper;
}
EOM
end
rescue Exception
# Find the closest index in HashRing with value <= the given value
def binary_search(ary, value, &block)
upper = ary.size - 1
lower = 0
idx = 0
while(lower <= upper) do
idx = (lower + upper) / 2
comp = ary[idx] <=> value
if comp == 0
return idx
elsif comp > 0
upper = idx - 1
else
lower = idx + 1
end
end
if upper < 0
upper = ary.size - 1
end
return upper
end
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment