복붙노트

[REDIS] 어떻게 비동기 토네이도와 레디 스를 사용할 수 있습니까?

REDIS

어떻게 비동기 토네이도와 레디 스를 사용할 수 있습니까?

내가 비동기 레디 스 토네이도를 사용할 수있는 방법을 찾기 위해 노력하고있어. 나는 토네이도 레디 스를 찾았지만 나는 더 많은 단지 코드의 수율을 추가보다 필요합니다.

나는 다음과 같은 코드가 있습니다 :

import redis
import tornado.web

class WaiterHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self):
        client = redis.StrictRedis(port=6279)
        pubsub = client.pubsub()
        pubsub.subscribe('test_channel')

        for item in pubsub.listen():
            if item['type'] == 'message':
                print item['channel']
                print item['data']

        self.write(item['data'])
        self.finish()


class GetHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello world")


application = tornado.web.Application([
    (r"/", GetHandler),
    (r"/wait", WaiterHandler),
])

if __name__ == '__main__':
    application.listen(8888)
    print 'running'
    tornado.ioloop.IOLoop.instance().start()

나는 액세스에게 / URL을 가져오고 / 대기에서 보류중인 요청이있는 동안은 "안녕하세요"를 얻을 필요가 없다. 어떻게하니?

해결법

  1. ==============================

    1.는 IO 루프를 차단하는 것 같이, 주 토네이도 스레드에서 레디 스 펍 / 서브을 사용하지 말아야합니다. 당신은 메인 스레드에서 웹 클라이언트에서 롱 폴링을 처리 할 수 ​​있습니다,하지만 당신은 레디 스 청취를위한 별도의 스레드를 만들어야합니다. 그런 다음 ioloop.add_callback 사용할 수 있습니다 () 및 / 또는 메시지를받을 때 threading.Queue는 메인 쓰레드와 통신 할 수 있습니다.

    는 IO 루프를 차단하는 것 같이, 주 토네이도 스레드에서 레디 스 펍 / 서브을 사용하지 말아야합니다. 당신은 메인 스레드에서 웹 클라이언트에서 롱 폴링을 처리 할 수 ​​있습니다,하지만 당신은 레디 스 청취를위한 별도의 스레드를 만들어야합니다. 그런 다음 ioloop.add_callback 사용할 수 있습니다 () 및 / 또는 메시지를받을 때 threading.Queue는 메인 쓰레드와 통신 할 수 있습니다.

  2. ==============================

    2.당신은 토네이도 IOLoop 호환 레디 스 클라이언트를 사용해야합니다.

    당신은 토네이도 IOLoop 호환 레디 스 클라이언트를 사용해야합니다.

    가능한 그들 중 몇 가지가 있습니다, toredis, brukva 등

    여기 toredis에서 pubsub의 예입니다 : https://github.com/mrjoes/toredis/blob/master/tests/test_handler.py

  3. ==============================

    3.파이썬> = 3.3, 나는 aioredis를 사용하도록 권합니다. 나는 아래의 코드를 테스트하지 않았다하지만 그런 일해야한다 :

    파이썬> = 3.3, 나는 aioredis를 사용하도록 권합니다. 나는 아래의 코드를 테스트하지 않았다하지만 그런 일해야한다 :

    import redis
    import tornado.web
    from tornado.web import RequestHandler
    
    import aioredis
    import asyncio
    from aioredis.pubsub import Receiver
    
    
    class WaiterHandler(tornado.web.RequestHandler):
    
        @tornado.web.asynchronous
        def get(self):
            client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop)
    
            ch = redis.channels['test_channel']
            result = None
            while await ch.wait_message():
                item = await ch.get()
                if item['type'] == 'message':
                    print item['channel']
                    print item['data']
                    result = item['data']
    
            self.write(result)
            self.finish()
    
    
    class GetHandler(tornado.web.RequestHandler):
    
        def get(self):
            self.write("Hello world")
    
    
    application = tornado.web.Application([
        (r"/", GetHandler),
        (r"/wait", WaiterHandler),
    ])
    
    if __name__ == '__main__':
        print 'running'
        tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
        server = tornado.httpserver.HTTPServer(application)
        server.bind(8888)
        # zero means creating as many processes as there are cores.
        server.start(0)
        tornado.ioloop.IOLoop.instance().start()
    
  4. ==============================

    4.좋아, 여기에 내가 GET 요청 함께 할 것입니다 방법을 내 예입니다.

    좋아, 여기에 내가 GET 요청 함께 할 것입니다 방법을 내 예입니다.

    나는 두 가지 주요 구성 요소를 추가 :

    제 로컬리스트 객체에 새로운 메시지를 추가하는 간단한 나사 pubsub 수신기이다. 정기적 목록에서 읽는 것처럼 당신이 리스너 스레드에서 읽을 수 있도록 또한, 클래스에리스트 접근을 추가했다. 지금까지 당신의 WebRequest 클래스에 관한 한, 당신은 단지 로컬 목록 개체에서 데이터를 읽는. 이 수익률은 즉시 접수 및 처리중인 요청을 완료 또는 미래에서 현재 요청을 차단하지 않습니다.

    class OpenChannel(threading.Thread):
        def __init__(self, channel, host = None, port = None):
            threading.Thread.__init__(self)
            self.lock = threading.Lock()
            self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
            self.pubsub = self.redis.pubsub()
            self.pubsub.subscribe(channel)
    
            self.output = []
    
        # lets implement basic getter methods on self.output, so you can access it like a regular list
        def __getitem__(self, item):
            with self.lock:
                return self.output[item]
    
        def __getslice__(self, start, stop = None, step = None):
            with self.lock:
                return self.output[start:stop:step]
    
        def __str__(self):
            with self.lock:
                return self.output.__str__()
    
        # thread loop
        def run(self):
            for message in self.pubsub.listen():
                with self.lock:
                    self.output.append(message['data'])
    
        def stop(self):
            self._Thread__stop()
    

    두 번째는 ApplicationMixin 클래스입니다. 이 보조 객체는 기능과 속성을 추가하기 위해 사용자의 웹 요청 클래스 상속 있습니다. 이 채널 수신기가 이미 요청 채널의 존재 여부를 체크한다이 경우, 아무 것도 발견되지 않은 경우 하나 생성하고, WebRequest 클래스로 돌아 청취자 핸들을.

    # add a method to the application that will return existing channels
    # or create non-existing ones and then return them
    class ApplicationMixin(object):
        def GetChannel(self, channel, host = None, port = None):
            if channel not in self.application.channels:
                self.application.channels[channel] = OpenChannel(channel, host, port)
                self.application.channels[channel].start()
            return self.application.channels[channel]
    

    이 정적 목록 (마음에 베어링 것을 당신에게 줄 self.write 문자열 필요) 인 것처럼하여 WebRequest 클래스는 이제 청취자를 취급

    class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
        @tornado.web.asynchronous
        def get(self, channel):
            # get the channel
            channel = self.GetChannel(channel)
            # write out its entire contents as a list
            self.write('{}'.format(channel[:]))
            self.finish() # not necessary?
    

    응용 프로그램을 만든 후 마지막으로, 나는 속성으로 빈 사전을 추가

    # add a dictionary containing channels to your application
    application.channels = {}
    

    응용 프로그램을 종료하면 실행중인 스레드뿐만 아니라 몇 가지 정리,

    # clean up the subscribed channels
    for channel in application.channels:
        application.channels[channel].stop()
        application.channels[channel].join()
    

    전체 코드 :

    import threading
    import redis
    import tornado.web
    
    
    
    class OpenChannel(threading.Thread):
        def __init__(self, channel, host = None, port = None):
            threading.Thread.__init__(self)
            self.lock = threading.Lock()
            self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
            self.pubsub = self.redis.pubsub()
            self.pubsub.subscribe(channel)
    
            self.output = []
    
        # lets implement basic getter methods on self.output, so you can access it like a regular list
        def __getitem__(self, item):
            with self.lock:
                return self.output[item]
    
        def __getslice__(self, start, stop = None, step = None):
            with self.lock:
                return self.output[start:stop:step]
    
        def __str__(self):
            with self.lock:
                return self.output.__str__()
    
        # thread loop
        def run(self):
            for message in self.pubsub.listen():
                with self.lock:
                    self.output.append(message['data'])
    
        def stop(self):
            self._Thread__stop()
    
    
    # add a method to the application that will return existing channels
    # or create non-existing ones and then return them
    class ApplicationMixin(object):
        def GetChannel(self, channel, host = None, port = None):
            if channel not in self.application.channels:
                self.application.channels[channel] = OpenChannel(channel, host, port)
                self.application.channels[channel].start()
            return self.application.channels[channel]
    
    class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
        @tornado.web.asynchronous
        def get(self, channel):
            # get the channel
            channel = self.GetChannel(channel)
            # write out its entire contents as a list
            self.write('{}'.format(channel[:]))
            self.finish() # not necessary?
    
    
    class GetHandler(tornado.web.RequestHandler):
    
        def get(self):
            self.write("Hello world")
    
    
    application = tornado.web.Application([
        (r"/", GetHandler),
        (r"/channel/(?P<channel>\S+)", ReadChannel),
    ])
    
    
    # add a dictionary containing channels to your application
    application.channels = {}
    
    
    if __name__ == '__main__':
        application.listen(8888)
        print 'running'
        try:
            tornado.ioloop.IOLoop.instance().start()
        except KeyboardInterrupt:
            pass
    
        # clean up the subscribed channels
        for channel in application.channels:
            application.channels[channel].stop()
            application.channels[channel].join()
    
  5. from https://stackoverflow.com/questions/15144809/how-can-i-use-tornado-and-redis-asynchronously by cc-by-sa and MIT license