[SPRING] WebClient로 초당 요청 수를 제한하는 방법은 무엇입니까?
SPRINGWebClient로 초당 요청 수를 제한하는 방법은 무엇입니까?
WebClient 개체를 사용하여 Http Post 요청을 서버에 보냅니다. 엄청난 양의 요청을 매우 빠르게 전송합니다 (QueueChannel에 약 4000 개의 메시지가 있음). 문제는 ... 서버가 충분히 빠르게 응답 할 수없는 것 같습니다 ... 그래서 서버 오류가 많이 발생하고 연결이 너무 일찍 종료됩니다.
초당 요청 수를 제한하는 방법이 있습니까? 또는 사용하는 스레드 수를 제한 하시겠습니까?
편집하다 :
QueueChannel의 메시지 끝점 처리 메시지 :
@MessageEndpoint
public class CustomServiceActivator {
private static final Logger logger = LogManager.getLogger();
@Autowired
IHttpService httpService;
@ServiceActivator(
inputChannel = "outputFilterChannel",
outputChannel = "outputHttpServiceChannel",
poller = @Poller( fixedDelay = "1000" )
)
public void processMessage(Data data) {
httpService.push(data);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
WebClient 서비스 클래스 :
@Service
public class HttpService implements IHttpService {
private static final String URL = "http://www.blabla.com/log";
private static final Logger logger = LogManager.getLogger();
@Autowired
WebClient webClient;
@Override
public void push(Data data) {
String body = constructString(data);
Mono<ResponseEntity<Response>> res = webClient.post()
.uri(URL + getLogType(data))
.contentLength(body.length())
.contentType(MediaType.APPLICATION_JSON)
.syncBody(body)
.exchange()
.flatMap(response -> response.toEntity(Response.class));
res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
}
}
해결법
-
==============================
1.질문 Reactor를 통한 요청 제한 속도는 응답자 두 명 (의견에 하나)
질문 Reactor를 통한 요청 제한 속도는 응답자 두 명 (의견에 하나)
속도 제한기로 작동하는 또 다른 플럭스
.zipWith (Flux.interval (Duration.of (1, ChronoUnit.SECONDS)))
각 웹 요청을 지연 시키십시오.
delayElements 함수를 사용한다.
편집 : 아래의 답변은 RestTemplate을 차단하는 데 유효하지만 반응 패턴에 잘 맞지 않습니다.
WebClient에는 요청을 제한 할 수있는 기능이 없지만 composition을 사용하여이 기능을 쉽게 추가 할 수 있습니다.
당신은 클라이언트를 외부에서 RateLimiter (Guava / (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
이 가이드에서는 http://www.baeldung.com/guava-rate-limiter에서 속도 제한 기능을 차단하는 방법 또는 제한 시간을 사용하는 방법을 설명합니다.
별도의 클래스에서 조절해야하는 모든 호출을 장식합니다.
-
==============================
2.나는 파티에 늦지 않았 으면 좋겠다. 어쨌든 요청 비율을 제한하는 것은 크롤러를 만들면서 일주일 전에 직면했던 문제 중 하나입니다. 문제는 다음과 같습니다.
나는 파티에 늦지 않았 으면 좋겠다. 어쨌든 요청 비율을 제한하는 것은 크롤러를 만들면서 일주일 전에 직면했던 문제 중 하나입니다. 문제는 다음과 같습니다.
해결책은 다음과 같습니다.
private Flux<HostListResponse> sequentialCrawl() { AtomicLong pageNo = new AtomicLong(2); // Solution for #1 - Flux.expand return getHosts(1) .doOnRequest(value -> LOGGER.info("Start crawling.")) .expand(hostListResponse -> { final long totalPages = hostListResponse.getData().getTotalPages(); long currPageNo = pageNo.getAndIncrement(); if (currPageNo <= totalPages) { LOGGER.info("Crawling page " + currPageNo + " of " + totalPages); // Solution for #2 return Mono.just(1).delayElement(Duration.ofSeconds(1)).then( getHosts(currPageNo) ); } return Flux.empty(); }) .doOnComplete(() -> LOGGER.info("End of crawling.")); } private Mono<HostListResponse> getHosts(long pageNo) { final String uri = hostListUrl + pageNo; LOGGER.info("Crawling " + uri); return webClient.get() .uri(uri) .exchange() // Solution for #3 .retryWhen(companion -> companion .zipWith(Flux.range(1, RETRY + 1), (error, index) -> { String message = "Failed to crawl uri: " + error.getMessage(); if (index <= RETRY && (error instanceof RequestIntervalTooShortException || error instanceof ConnectTimeoutException || "Connection reset by peer".equals(error.getMessage()) )) { LOGGER.info(message + ". Retries count: " + index); return Tuples.of(error, index); } else { LOGGER.warn(message); throw Exceptions.propagate(error); //terminate the source with the 4th `onError` } }) .map(tuple -> { // Solution for #4 Throwable e = tuple.getT1(); int delaySeconds = tuple.getT2(); // TODO: Adjust these values according to your needs if (e instanceof ConnectTimeoutException) { delaySeconds = delaySeconds * 5; } else if ("Connection reset by peer".equals(e.getMessage())) { // The API that this app is calling will sometimes think that the requests are SPAM. So let's rest longer before retrying the request. delaySeconds = delaySeconds * 10; } LOGGER.info("Will retry crawling after " + delaySeconds + " seconds to " + uri + "."); return Mono.delay(Duration.ofSeconds(delaySeconds)); }) .doOnNext(s -> LOGGER.warn("Request is too short - " + uri + ". Retried at " + LocalDateTime.now())) ) .flatMap(clientResponse -> clientResponse.toEntity(String.class)) .map(responseEntity -> { HttpStatus statusCode = responseEntity.getStatusCode(); if (statusCode != HttpStatus.OK) { Throwable exception; // Convert json string to Java POJO HostListResponse response = toHostListResponse(uri, statusCode, responseEntity.getBody()); // The API that I'm calling will return error code of 06 if request interval is too short if (statusCode == HttpStatus.BAD_REQUEST && "06".equals(response.getError().getCode())) { exception = new RequestIntervalTooShortException(uri); } else { exception = new IllegalStateException("Request to " + uri + " failed. Reason: " + responseEntity.getBody()); } throw Exceptions.propagate(exception); } else { return toHostListResponse(uri, statusCode, responseEntity.getBody()); } }); }
from https://stackoverflow.com/questions/50387584/how-to-limit-the-request-second-with-webclient by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] Java 8에서 Spring AOP가 IllegalArgumentException을 발생시킵니다. (0) | 2019.05.16 |
---|---|
[SPRING] Spring 5의 Reactive Programming을 사용하여 웹 스레드의 최대 수를 실제로 "관리"하는 방법은 무엇입니까? (0) | 2019.05.16 |
[SPRING] Spring JMS (ActiveMQ)가 메시지 배달을 지연 시켰습니다. (0) | 2019.05.16 |
[SPRING] Jenkins Plugin의 spring-core 의존성 버전 오류 (0) | 2019.05.16 |
[SPRING] Spring MVC는 extension-less URL을 지원합니까? (0) | 2019.05.16 |