[REDIS] 어떻게 비동기 토네이도와 레디 스를 사용할 수 있습니까?
REDIS어떻게 비동기 토네이도와 레디 스를 사용할 수 있습니까?
내가 비동기 레디 스 토네이도를 사용할 수있는 방법을 찾기 위해 노력하고있어. 나는 토네이도 레디 스를 찾았지만 나는 더 많은 단지 코드의 수율을 추가보다 필요합니다.
나는 다음과 같은 코드가 있습니다 :
import redis
import tornado.web
class WaiterHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
client = redis.StrictRedis(port=6279)
pubsub = client.pubsub()
pubsub.subscribe('test_channel')
for item in pubsub.listen():
if item['type'] == 'message':
print item['channel']
print item['data']
self.write(item['data'])
self.finish()
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/wait", WaiterHandler),
])
if __name__ == '__main__':
application.listen(8888)
print 'running'
tornado.ioloop.IOLoop.instance().start()
나는 액세스에게 / URL을 가져오고 / 대기에서 보류중인 요청이있는 동안은 "안녕하세요"를 얻을 필요가 없다. 어떻게하니?
해결법
-
==============================
1.는 IO 루프를 차단하는 것 같이, 주 토네이도 스레드에서 레디 스 펍 / 서브을 사용하지 말아야합니다. 당신은 메인 스레드에서 웹 클라이언트에서 롱 폴링을 처리 할 수 있습니다,하지만 당신은 레디 스 청취를위한 별도의 스레드를 만들어야합니다. 그런 다음 ioloop.add_callback 사용할 수 있습니다 () 및 / 또는 메시지를받을 때 threading.Queue는 메인 쓰레드와 통신 할 수 있습니다.
는 IO 루프를 차단하는 것 같이, 주 토네이도 스레드에서 레디 스 펍 / 서브을 사용하지 말아야합니다. 당신은 메인 스레드에서 웹 클라이언트에서 롱 폴링을 처리 할 수 있습니다,하지만 당신은 레디 스 청취를위한 별도의 스레드를 만들어야합니다. 그런 다음 ioloop.add_callback 사용할 수 있습니다 () 및 / 또는 메시지를받을 때 threading.Queue는 메인 쓰레드와 통신 할 수 있습니다.
-
==============================
2.당신은 토네이도 IOLoop 호환 레디 스 클라이언트를 사용해야합니다.
당신은 토네이도 IOLoop 호환 레디 스 클라이언트를 사용해야합니다.
가능한 그들 중 몇 가지가 있습니다, toredis, brukva 등
여기 toredis에서 pubsub의 예입니다 : https://github.com/mrjoes/toredis/blob/master/tests/test_handler.py
-
==============================
3.파이썬> = 3.3, 나는 aioredis를 사용하도록 권합니다. 나는 아래의 코드를 테스트하지 않았다하지만 그런 일해야한다 :
파이썬> = 3.3, 나는 aioredis를 사용하도록 권합니다. 나는 아래의 코드를 테스트하지 않았다하지만 그런 일해야한다 :
import redis import tornado.web from tornado.web import RequestHandler import aioredis import asyncio from aioredis.pubsub import Receiver class WaiterHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self): client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop) ch = redis.channels['test_channel'] result = None while await ch.wait_message(): item = await ch.get() if item['type'] == 'message': print item['channel'] print item['data'] result = item['data'] self.write(result) self.finish() class GetHandler(tornado.web.RequestHandler): def get(self): self.write("Hello world") application = tornado.web.Application([ (r"/", GetHandler), (r"/wait", WaiterHandler), ]) if __name__ == '__main__': print 'running' tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop') server = tornado.httpserver.HTTPServer(application) server.bind(8888) # zero means creating as many processes as there are cores. server.start(0) tornado.ioloop.IOLoop.instance().start()
-
==============================
4.좋아, 여기에 내가 GET 요청 함께 할 것입니다 방법을 내 예입니다.
좋아, 여기에 내가 GET 요청 함께 할 것입니다 방법을 내 예입니다.
나는 두 가지 주요 구성 요소를 추가 :
제 로컬리스트 객체에 새로운 메시지를 추가하는 간단한 나사 pubsub 수신기이다. 정기적 목록에서 읽는 것처럼 당신이 리스너 스레드에서 읽을 수 있도록 또한, 클래스에리스트 접근을 추가했다. 지금까지 당신의 WebRequest 클래스에 관한 한, 당신은 단지 로컬 목록 개체에서 데이터를 읽는. 이 수익률은 즉시 접수 및 처리중인 요청을 완료 또는 미래에서 현재 요청을 차단하지 않습니다.
class OpenChannel(threading.Thread): def __init__(self, channel, host = None, port = None): threading.Thread.__init__(self) self.lock = threading.Lock() self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) self.pubsub = self.redis.pubsub() self.pubsub.subscribe(channel) self.output = [] # lets implement basic getter methods on self.output, so you can access it like a regular list def __getitem__(self, item): with self.lock: return self.output[item] def __getslice__(self, start, stop = None, step = None): with self.lock: return self.output[start:stop:step] def __str__(self): with self.lock: return self.output.__str__() # thread loop def run(self): for message in self.pubsub.listen(): with self.lock: self.output.append(message['data']) def stop(self): self._Thread__stop()
두 번째는 ApplicationMixin 클래스입니다. 이 보조 객체는 기능과 속성을 추가하기 위해 사용자의 웹 요청 클래스 상속 있습니다. 이 채널 수신기가 이미 요청 채널의 존재 여부를 체크한다이 경우, 아무 것도 발견되지 않은 경우 하나 생성하고, WebRequest 클래스로 돌아 청취자 핸들을.
# add a method to the application that will return existing channels # or create non-existing ones and then return them class ApplicationMixin(object): def GetChannel(self, channel, host = None, port = None): if channel not in self.application.channels: self.application.channels[channel] = OpenChannel(channel, host, port) self.application.channels[channel].start() return self.application.channels[channel]
이 정적 목록 (마음에 베어링 것을 당신에게 줄 self.write 문자열 필요) 인 것처럼하여 WebRequest 클래스는 이제 청취자를 취급
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): @tornado.web.asynchronous def get(self, channel): # get the channel channel = self.GetChannel(channel) # write out its entire contents as a list self.write('{}'.format(channel[:])) self.finish() # not necessary?
응용 프로그램을 만든 후 마지막으로, 나는 속성으로 빈 사전을 추가
# add a dictionary containing channels to your application application.channels = {}
응용 프로그램을 종료하면 실행중인 스레드뿐만 아니라 몇 가지 정리,
# clean up the subscribed channels for channel in application.channels: application.channels[channel].stop() application.channels[channel].join()
전체 코드 :
import threading import redis import tornado.web class OpenChannel(threading.Thread): def __init__(self, channel, host = None, port = None): threading.Thread.__init__(self) self.lock = threading.Lock() self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) self.pubsub = self.redis.pubsub() self.pubsub.subscribe(channel) self.output = [] # lets implement basic getter methods on self.output, so you can access it like a regular list def __getitem__(self, item): with self.lock: return self.output[item] def __getslice__(self, start, stop = None, step = None): with self.lock: return self.output[start:stop:step] def __str__(self): with self.lock: return self.output.__str__() # thread loop def run(self): for message in self.pubsub.listen(): with self.lock: self.output.append(message['data']) def stop(self): self._Thread__stop() # add a method to the application that will return existing channels # or create non-existing ones and then return them class ApplicationMixin(object): def GetChannel(self, channel, host = None, port = None): if channel not in self.application.channels: self.application.channels[channel] = OpenChannel(channel, host, port) self.application.channels[channel].start() return self.application.channels[channel] class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): @tornado.web.asynchronous def get(self, channel): # get the channel channel = self.GetChannel(channel) # write out its entire contents as a list self.write('{}'.format(channel[:])) self.finish() # not necessary? class GetHandler(tornado.web.RequestHandler): def get(self): self.write("Hello world") application = tornado.web.Application([ (r"/", GetHandler), (r"/channel/(?P<channel>\S+)", ReadChannel), ]) # add a dictionary containing channels to your application application.channels = {} if __name__ == '__main__': application.listen(8888) print 'running' try: tornado.ioloop.IOLoop.instance().start() except KeyboardInterrupt: pass # clean up the subscribed channels for channel in application.channels: application.channels[channel].stop() application.channels[channel].join()
from https://stackoverflow.com/questions/15144809/how-can-i-use-tornado-and-redis-asynchronously by cc-by-sa and MIT license
'REDIS' 카테고리의 다른 글
[REDIS] sidekiq 두 개의 별도의 레디 스 인스턴스에 대한 작업? (0) | 2020.01.13 |
---|---|
[REDIS] 서비스 스택 레디 스 목록 업데이트 (0) | 2020.01.13 |
[REDIS] 시작 설정 파일에 서버를 레디 스 (0) | 2020.01.13 |
[REDIS] 캔 레디 스 PostgreSQL을 같은 데이터베이스에 쓰는? (0) | 2020.01.13 |
[REDIS] 레디 스 서버 1024M의 maxheap보다 더 실행할 수 없습니다 (0) | 2020.01.13 |