복붙노트

[REDIS] 레디 스 + ActionController :: 라이브 스레드가 죽어하지

REDIS

레디 스 + ActionController :: 라이브 스레드가 죽어하지

배경 : 우리는 우리의 기존의 레일 응용 프로그램 중 하나에 채팅 기능을 구축했습니다. 우리는 :: 라이브 모듈을 새로운 ActionController을 사용하고 (생산 Nginx와 함께) 푸마를 실행하고, 레디 스를 통해 메시지를 구독하고 있습니다. 우리는 비동기 적으로 연결을 설정하는 EventSource 클라이언트 측을 사용하고 있습니다.

문제 요약 : 연결이 종료 될 때 스레드는 결코 죽어되지 않습니다.

예를 들어, 멀리 사용자의 탐색은, 브라우저를 닫아야합니다, 또는 응용 프로그램 내에서 다른 페이지로 이동, 새로운 스레드 (예상대로) 양산되지만, 이전 살고 계속됩니다.

내가 현재 그것을보고 같은 문제는 이러한 상황이 발생했을 때 뭔가 시도가 브라우저 일단 결코 일어나지 않을 것이다이 깨진 스트림에 쓸 때까지 서버가 브라우저의 끝에있는 연결이 종료되어 있는지 여부를 알 수있는 방법이 없다는 것입니다 원래 페이지에서 멀리 이동했습니다.

이 문제는 GitHub의에 문서화하는 것, 그리고 유사한 질문 (활성 스레드의 수를 점점에 관한) 여기 (꽤 잘 동일한 질문) 여기에 StackOverflow에 요청하고있다.

나는이 게시물에 기반을 마련 할 수있었습니다 유일한 해결책은, 스레드 / 연결 포커의 유형을 구현하는 것입니다. 깨진 연결에 쓰기를 시도하면 스레드가 죽을 수 있도록, 내가 제대로 가까운 연결을 잡을 수있는 IO 오류를 생성합니다. 이는 해당 솔루션에 대한 컨트롤러 코드입니다 :

def events
  response.headers["Content-Type"] = "text/event-stream"

  stream_error = false; # used by flusher thread to determine when to stop

  redis = Redis.new

  # Subscribe to our events
  redis.subscribe("message.create", "message.user_list_update") do |on| 
    on.message do |event, data| # when message is received, write to stream
      response.stream.write("messageType: '#{event}', data: #{data}\n\n")
    end

    # This is the monitor / connection poker thread
    # Periodically poke the connection by attempting to write to the stream
    flusher_thread = Thread.new do
      while !stream_error
        $redis.publish "message.create", "flusher_test"
        sleep 2.seconds
      end
    end
  end 

  rescue IOError
    logger.info "Stream closed"
    stream_error = true;
  ensure
    logger.info "Events action is quitting redis and closing stream!"
    redis.quit
    response.stream.close
end

(참고 : 이벤트 방법은 가입 메소드 호출에 차단받을 것으로 보인다 다른 모든 (스트리밍)의 난이 일반 가정 때문에 제대로 작동합니다.).

(기타 참고 사항 :. 플러 셔 스레드의 개념은 단일 장기 실행 백그라운드 프로세스로 더 의미, 쓰레기 스레드 수집기 같은 비트를 만드는 위의 내 구현의 문제는 새로운 스레드가 무의미 각 연결을 위해 만들어지는 것입니다 누군가를. 이 개념은, 하나의 프로세스처럼 더 그것을해야 구현을 시도하고있는 것은 너무 많이 나는 내가 성공적으로 하나의 백그라운드 프로세스로이를 다시 구현했을 때이 게시물을 업데이트 할 수 있습니다. 설명했듯이.)

이 솔루션의 단점은 우리가 단지 지연 또는 문제가 완전히 해결되지를 적어 한 것입니다. 우리는 여전히 스케일링 관점에서 끔찍한 보인다 아약스와 같은 다른 요청에 추가하여 사용자 당 2 개 스레드를 가지고; 그것은 많은 수의 동시 연결과 더 큰 시스템을 완전히 달성 할 수없는 비현실적 보인다.

나는 중요한 뭔가를 놓친 거지 같은 느낌; 나는 그것이 다소 어려운 그 레일 믿을 그래서 분명히 내가했던 것처럼 사용자 정의 연결 검사기를 구현하지 않고 파괴하는 기능이 찾으십시오.

질문 : 우리는 어떻게 연결 / 스레드가 이러한 '연결 포커', 또는 쓰레기 스레드 수집기로 뭔가 촌스러운를 구현하지 않고 죽을 수 있습니까?

언제나 난 아무것도 탈락 한 경우 알려주세요.

최신 정보 그냥 추가 정보의 비트를 추가합니다 : Huetsch이 끝난 GitHub의에서 SSE가 정상적으로 연결이 (이 경우 서버) 다른 쪽 끝을시키는 폐쇄 된 FIN 패킷을 전송 TCP에 기반하고 있다는 지적이 댓글을 게시 알고 그 연결을 종료하는 것이 안전합니다. Huetsch은 (그런 경우, 레일에 확실히 버그) 브라우저 중 하나가 (?를 EventSource 라이브러리에 아마 버그) 그 패킷을 전송하지 않거나 레일을 잡기되지 않았거나 아무것도를하고 있음을 지적한다. 검색은 계속 ...

또 다른 업데이트 Wireshark를 사용하여, 나는 참으로 FIN 패킷이 전송되는 것을 볼 수 있습니다. 틀림없이, 내가 무엇을 말할 수, 나는 확실히 나는 경우 브라우저에서 EventSource를 사용하여 SSE 연결 및 NO 패킷하여 전송을 설정할 때 브라우저에서 전송되는 FIN 패킷을 감지하지만에서 매우 지식 또는 프로토콜 수준의 물건을 경험하지 않다 그 연결 (더 ​​SSE를 의미가없는)를 제거합니다. 나는 몹시까지 내 TCP 지식에 아니에요 있지만,이 연결이 실제로 제대로 클라이언트가 종료되고 있다는 것을 나에게 표시하는 것; 아마도이 퓨마 또는 레일에 버그가 있음을 나타냅니다.

또 다른 갱신 @JamesBoutcher / boutcheratwest (GitHub의)는 특히. (p)의 가입 방법은 종료 결코 사실에 관해서,이 문제에 관한 레디 스 웹 사이트에서 토론에 저를 지적했다. 해당 사이트의 포스터는 우리가 레일 환경이 클라이언트 측 연결이 종료 될 때 통지를하지, 따라서 실행 할 수없는 적이 있는지, 여기서 발견 한 것과 같은 일을 지적했다. (p)를 구독 취소 방법. 그는위한 시간 제한에 대해 조회한다. (p)를 확인하는 방법합니다 (I 위에서 설명한 연결 포커, 또는 자신의 타임 아웃 제안) 더 나은 솔루션이 될 것 아니에요 불구하고, 내가 잘 작동 생각하는 방법을, 가입 . 이상적으로는 연결 포커 솔루션, 나는 연결이 스트림에 작성하지 않고 다른 쪽 끝에서 닫혀 있는지 여부를 확인하는 방법을 찾아야하고 싶습니다. 당신이 볼 수있는대로, 바로 지금, 나는 내가 돌출이라고 생각하는, 별도로 내 "파고"메시지를 처리하고 지옥 같은 바보 같은 클라이언트 측 코드를 구현해야합니다.

해결법

  1. ==============================

    1.괜찮 작동하는 것 같다 (@teeg에서 많은 것을 차용) 난 그냥 한 해결책은 (실패, 상점 그것을 테스트하지 않았습니다)

    괜찮 작동하는 것 같다 (@teeg에서 많은 것을 차용) 난 그냥 한 해결책은 (실패, 상점 그것을 테스트하지 않았습니다)

    설정 / 초기화 / redis.rb

    $redis = Redis.new(:host => "xxxx.com", :port => 6379)
    
    heartbeat_thread = Thread.new do
      while true
        $redis.publish("heartbeat","thump")
        sleep 30.seconds
      end
    end
    
    at_exit do
      # not sure this is needed, but just in case
      heartbeat_thread.kill
      $redis.quit
    end
    

    그리고 내 컨트롤러에서 :

    def events
        response.headers["Content-Type"] = "text/event-stream"
        redis = Redis.new(:host => "xxxxxxx.com", :port => 6379)
        logger.info "New stream starting, connecting to redis"
        redis.subscribe(['parse.new','heartbeat']) do |on|
          on.message do |event, data|
            if event == 'parse.new'
              response.stream.write("event: parse\ndata: #{data}\n\n")
            elsif event == 'heartbeat'
              response.stream.write("event: heartbeat\ndata: heartbeat\n\n")
            end
          end
        end
      rescue IOError
        logger.info "Stream closed"
      ensure
        logger.info "Stopping stream thread"
        redis.quit
        response.stream.close
      end
    
  2. ==============================

    2.나는 현재 응용 프로그램을 만들고있어 그 ActionController 중심으로 돌아 가지 : 라이브, EventSource 푸마하고 ClientDisconnected를 구출 할 필요가 레일 4.2 대신 IO 오류를 구출의 문제 폐쇄 스트림 등을 발생한 것들에 대해. 예:

    나는 현재 응용 프로그램을 만들고있어 그 ActionController 중심으로 돌아 가지 : 라이브, EventSource 푸마하고 ClientDisconnected를 구출 할 필요가 레일 4.2 대신 IO 오류를 구출의 문제 폐쇄 스트림 등을 발생한 것들에 대해. 예:

    def stream
      #Begin is not required
      twitter_client = Twitter::Streaming::Client.new(config_params) do |obj|
        # Do something
      end
    rescue ClientDisconnected
      # Do something when disconnected
    ensure
      # Do something else to ensure the stream is closed
    end
    

    나는이 편리한이 포럼 게시물에서 팁 (모든 아래쪽에있는 방법을) 발견 : http://railscasts.com/episodes/401-actioncontroller-live?view=comments

  3. ==============================

    3.@ 제임스 Boutcher에 구축, 제가 설정 / 초기화 / redis.rb에서 하트 비트에 만든 단 1 개 스레드를 가지고 그래서,이 명 노동자 클러스터 푸마에 다음을 사용 :

    @ 제임스 Boutcher에 구축, 제가 설정 / 초기화 / redis.rb에서 하트 비트에 만든 단 1 개 스레드를 가지고 그래서,이 명 노동자 클러스터 푸마에 다음을 사용 :

    설정 / puma.rb

    on_worker_boot do |index|
      puts "worker nb #{index.to_s} booting"
      create_heartbeat if index.to_i==0
    end
    
    def create_heartbeat
      puts "creating heartbeat"
      $redis||=Redis.new
      heartbeat = Thread.new do
        ActiveRecord::Base.connection_pool.release_connection
        begin
          while true
            hash={event: "heartbeat",data: "heartbeat"}
            $redis.publish("heartbeat",hash.to_json)
            sleep 20.seconds
          end
        ensure
          #no db connection anyway
        end
      end
    end
    
  4. ==============================

    4.여기에 하트 비트를 사용하지 않는 잠재적 간단한 해결책이다. 많은 연구와 실험 후, 여기에 (쉽게 레일 4에 적응해야한다) 보석 SSE 내가시나 +시나로 사용하고 코드는 다음과 같습니다

    여기에 하트 비트를 사용하지 않는 잠재적 간단한 해결책이다. 많은 연구와 실험 후, 여기에 (쉽게 레일 4에 적응해야한다) 보석 SSE 내가시나 +시나로 사용하고 코드는 다음과 같습니다

    class EventServer < Sinatra::Base
     include Sinatra::SSE
     set :connections, []
     .
     .
     .
     get '/channel/:channel' do
     .
     .
     .
      sse_stream do |out|
        settings.connections << out
        out.callback {
          puts 'Client disconnected from sse';
          settings.connections.delete(out);
        }
      redis.subscribe(channel) do |on|
          on.subscribe do |channel, subscriptions|
            puts "Subscribed to redis ##{channel}\n"
          end
          on.message do |channel, message|
            puts "Message from redis ##{channel}: #{message}\n"
            message = JSON.parse(message)
            .
            .
            .
            if settings.connections.include?(out)
              out.push(message)
            else
              puts 'closing orphaned redis connection'
              redis.unsubscribe
            end
          end
        end
      end
    end
    

    레디 스 연결 블록 on.message 만 (P) / (P) 구독 취소 명령을 신청 받아들입니다. 당신이 취소되면, 레디 스 연결이 더 이상 차단되지 않고 초기 SSE 요청에 의해 인스턴스화 웹 서버 객체에 의해 방출 될 수있다. 브라우저에 레디 스에 메시지 SSE 연결을받을 때 자동으로 더 이상 컬렉션 배열에 존재 클리어하지 않습니다.

  5. ==============================

    5.다음은 레디 스를 차단 종료됩니다 타임 아웃 솔루션입니다. (P)는 전화를 가입하고 사용되지 않는 연결 트레드를 죽일.

    다음은 레디 스를 차단 종료됩니다 타임 아웃 솔루션입니다. (P)는 전화를 가입하고 사용되지 않는 연결 트레드를 죽일.

    class Stream::FixedController < StreamController
      def events
        # Rails reserve a db connection from connection pool for
        # each request, lets put it back into connection pool.
        ActiveRecord::Base.clear_active_connections!
    
        # Last time of any (except heartbeat) activity on stream
        # it mean last time of any message was send from server to client
        # or time of setting new connection
        @last_active = Time.zone.now
    
        # Redis (p)subscribe is blocking request so we need do some trick
        # to prevent it freeze request forever.
        redis.psubscribe("messages:*", 'heartbeat') do |on|
          on.pmessage do |pattern, event, data|
            # capture heartbeat from Redis pub/sub
            if event == 'heartbeat'
              # calculate idle time (in secounds) for this stream connection
              idle_time = (Time.zone.now - @last_active).to_i
    
              # Now we need to relase connection with Redis.(p)subscribe
              # chanel to allow go of any Exception (like connection closed)
              if idle_time > 4.minutes
                # unsubscribe from Redis because of idle time was to long
                # that's all - fix in (almost)one line :)
                redis.punsubscribe
              end
            else
              # save time of this (last) activity
              @last_active = Time.zone.now
            end
            # write to stream - even heartbeat - it's sometimes chance to
            # capture dissconection error before idle_time
            response.stream.write("event: #{event}\ndata: #{data}\n\n")
          end
        end
        # blicking end (no chance to get below this line without unsubscribe)
      rescue IOError
        Logs::Stream.info "Stream closed"
      rescue ClientDisconnected
        Logs::Stream.info "ClientDisconnected"
      rescue ActionController::Live::ClientDisconnected
        Logs::Stream.info "Live::ClientDisconnected"
      ensure
        Logs::Stream.info "Stream ensure close"
        redis.quit
        response.stream.close
      end
    end
    

    당신은 사용 빨강에 있습니다.이 차단 ​​통화를 종료 할 구독 취소 (P). 예외이 휴식 할 수 없습니다.

    이 수정 프로그램에 대한 정보를 내 간단한 응용 프로그램 : https://github.com/piotr-kedziak/redis-subscribe-stream-puma-fix

  6. ==============================

    6.대신 모든 클라이언트에 하트 비트를 전송하는, 그냥 각 연결에 대한 감시를 설정하는 것이 더 쉬울 수 있습니다. [@NeilJewers 덕분]

    대신 모든 클라이언트에 하트 비트를 전송하는, 그냥 각 연결에 대한 감시를 설정하는 것이 더 쉬울 수 있습니다. [@NeilJewers 덕분]

    class Stream::FixedController < StreamController
      def events
        # Rails reserve a db connection from connection pool for
        # each request, lets put it back into connection pool.
        ActiveRecord::Base.clear_active_connections!
    
        redis = Redis.new
    
        watchdog = Doberman::WatchDog.new(:timeout => 20.seconds)
        watchdog.start
    
        # Redis (p)subscribe is blocking request so we need do some trick
        # to prevent it freeze request forever.
        redis.psubscribe("messages:*") do |on|
          on.pmessage do |pattern, event, data|
            begin
              # write to stream - even heartbeat - it's sometimes chance to
              response.stream.write("event: #{event}\ndata: #{data}\n\n")
              watchdog.ping
    
            rescue Doberman::WatchDog::Timeout => e
              raise ClientDisconnected if response.stream.closed?
              watchdog.ping
            end
          end
        end
    
      rescue IOError
      rescue ClientDisconnected
    
      ensure
        response.stream.close
        redis.quit
        watchdog.stop
      end
    end
    
  7. from https://stackoverflow.com/questions/18970458/redis-actioncontrollerlive-threads-not-dying by cc-by-sa and MIT license