복붙노트

[SPRING] WebClient로 초당 요청 수를 제한하는 방법은 무엇입니까?

SPRING

WebClient로 초당 요청 수를 제한하는 방법은 무엇입니까?

WebClient 개체를 사용하여 Http Post 요청을 서버에 보냅니다. 엄청난 양의 요청을 매우 빠르게 전송합니다 (QueueChannel에 약 4000 개의 메시지가 있음). 문제는 ... 서버가 충분히 빠르게 응답 할 수없는 것 같습니다 ... 그래서 서버 오류가 많이 발생하고 연결이 너무 일찍 종료됩니다.

초당 요청 수를 제한하는 방법이 있습니까? 또는 사용하는 스레드 수를 제한 하시겠습니까?

편집하다 :

QueueChannel의 메시지 끝점 처리 메시지 :

@MessageEndpoint
public class CustomServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    IHttpService httpService;

    @ServiceActivator(
            inputChannel = "outputFilterChannel",
            outputChannel = "outputHttpServiceChannel",
            poller = @Poller( fixedDelay = "1000" )
    )
    public void processMessage(Data data) {
        httpService.push(data);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

WebClient 서비스 클래스 :

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://www.blabla.com/log";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ResponseEntity<Response>> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange()
                .flatMap(response -> response.toEntity(Response.class));

        res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
    }
}

해결법

  1. ==============================

    1.질문 Reactor를 통한 요청 제한 속도는 응답자 두 명 (의견에 하나)

    질문 Reactor를 통한 요청 제한 속도는 응답자 두 명 (의견에 하나)

    속도 제한기로 작동하는 또 다른 플럭스

    .zipWith (Flux.interval (Duration.of (1, ChronoUnit.SECONDS)))

    각 웹 요청을 지연 시키십시오.

    delayElements 함수를 사용한다.

    편집 : 아래의 답변은 RestTemplate을 차단하는 데 유효하지만 반응 패턴에 잘 맞지 않습니다.

    WebClient에는 요청을 제한 할 수있는 기능이 없지만 composition을 사용하여이 기능을 쉽게 추가 할 수 있습니다.

    당신은 클라이언트를 외부에서 RateLimiter (Guava / (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)

    이 가이드에서는 http://www.baeldung.com/guava-rate-limiter에서 속도 제한 기능을 차단하는 방법 또는 제한 시간을 사용하는 방법을 설명합니다.

    별도의 클래스에서 조절해야하는 모든 호출을 장식합니다.

  2. ==============================

    2.나는 파티에 늦지 않았 으면 좋겠다. 어쨌든 요청 비율을 제한하는 것은 크롤러를 만들면서 일주일 전에 직면했던 문제 중 하나입니다. 문제는 다음과 같습니다.

    나는 파티에 늦지 않았 으면 좋겠다. 어쨌든 요청 비율을 제한하는 것은 크롤러를 만들면서 일주일 전에 직면했던 문제 중 하나입니다. 문제는 다음과 같습니다.

    해결책은 다음과 같습니다.

    private Flux<HostListResponse> sequentialCrawl() {
        AtomicLong pageNo = new AtomicLong(2);
        // Solution for #1 - Flux.expand
        return getHosts(1)
            .doOnRequest(value -> LOGGER.info("Start crawling."))
            .expand(hostListResponse -> { 
                final long totalPages = hostListResponse.getData().getTotalPages();
                long currPageNo = pageNo.getAndIncrement();
                if (currPageNo <= totalPages) {
                    LOGGER.info("Crawling page " + currPageNo + " of " + totalPages);
                    // Solution for #2
                    return Mono.just(1).delayElement(Duration.ofSeconds(1)).then(
                        getHosts(currPageNo)
                    );
                }
                return Flux.empty();
            })
            .doOnComplete(() -> LOGGER.info("End of crawling."));
    }
    
    private Mono<HostListResponse> getHosts(long pageNo) {
        final String uri = hostListUrl + pageNo;
        LOGGER.info("Crawling " + uri);
    
        return webClient.get()
            .uri(uri)
            .exchange()
            // Solution for #3
            .retryWhen(companion -> companion
                .zipWith(Flux.range(1, RETRY + 1), (error, index) -> {
                    String message = "Failed to crawl uri: " + error.getMessage();
                    if (index <= RETRY && (error instanceof RequestIntervalTooShortException
                        || error instanceof ConnectTimeoutException
                        || "Connection reset by peer".equals(error.getMessage())
                    )) {
                        LOGGER.info(message + ". Retries count: " + index);
                        return Tuples.of(error, index);
                    } else {
                        LOGGER.warn(message);
                        throw Exceptions.propagate(error); //terminate the source with the 4th `onError`
                    }
                })
                .map(tuple -> {
                    // Solution for #4
                    Throwable e = tuple.getT1();
                    int delaySeconds = tuple.getT2();
                    // TODO: Adjust these values according to your needs
                    if (e instanceof ConnectTimeoutException) {
                        delaySeconds = delaySeconds * 5;
                    } else if ("Connection reset by peer".equals(e.getMessage())) {
                        // The API that this app is calling will sometimes think that the requests are SPAM. So let's rest longer before retrying the request.
                        delaySeconds = delaySeconds * 10;
                    }
                    LOGGER.info("Will retry crawling after " + delaySeconds + " seconds to " + uri + ".");
                    return Mono.delay(Duration.ofSeconds(delaySeconds));
                })
                .doOnNext(s -> LOGGER.warn("Request is too short - " + uri + ". Retried at " + LocalDateTime.now()))
            )
            .flatMap(clientResponse -> clientResponse.toEntity(String.class))
            .map(responseEntity -> {
                HttpStatus statusCode = responseEntity.getStatusCode();
                if (statusCode != HttpStatus.OK) {
                    Throwable exception;
                    // Convert json string to Java POJO
                    HostListResponse response = toHostListResponse(uri, statusCode, responseEntity.getBody());
                    // The API that I'm calling will return error code of 06 if request interval is too short
                    if (statusCode == HttpStatus.BAD_REQUEST && "06".equals(response.getError().getCode())) {
                        exception = new RequestIntervalTooShortException(uri);
                    } else {
                        exception = new IllegalStateException("Request to " + uri + " failed. Reason: " + responseEntity.getBody());
                    }
                    throw Exceptions.propagate(exception);
                } else {
                    return toHostListResponse(uri, statusCode, responseEntity.getBody());
                }
            });
    }
    
  3. from https://stackoverflow.com/questions/50387584/how-to-limit-the-request-second-with-webclient by cc-by-sa and MIT license