복붙노트

[SPRING] 클러스터 환경에서 실행되는 Spring Scheduled Task

SPRING

클러스터 환경에서 실행되는 Spring Scheduled Task

60 초마다 실행되는 cron 작업이있는 응용 프로그램을 작성하고 있습니다. 응용 프로그램은 여러 인스턴스에 필요할 때 확장되도록 구성됩니다. 나는 단지 60 초마다 1 인스턴스 (모든 노드에서)에서 작업을 실행하기를 원합니다. 상자에서 벗어나서 나는 이것에 대한 해결책을 찾을 수 없으며 전에 여러 번 물어 본 적이 없다는 사실에 놀랐습니다. Spring 4.1.6을 사용하고 있습니다.

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>

모든 지침은 크게 감사하겠습니다. 감사

해결법

  1. ==============================

    1.일괄 작업과 예약 된 작업은 대개 고객 용 응용 프로그램이 아닌 독립 실행 형 서버에서 실행되므로 클러스터에서 실행될 것으로 예상되는 응용 프로그램에 작업을 포함하는 것이 일반적 요구 사항은 아닙니다. 또한 클러스터 된 환경의 작업은 일반적으로 병렬로 실행되는 동일한 작업의 다른 인스턴스에 대해 걱정할 필요가 없으므로 작업 인스턴스를 격리하는 것이 큰 요구 사항은 아닙니다.

    일괄 작업과 예약 된 작업은 대개 고객 용 응용 프로그램이 아닌 독립 실행 형 서버에서 실행되므로 클러스터에서 실행될 것으로 예상되는 응용 프로그램에 작업을 포함하는 것이 일반적 요구 사항은 아닙니다. 또한 클러스터 된 환경의 작업은 일반적으로 병렬로 실행되는 동일한 작업의 다른 인스턴스에 대해 걱정할 필요가 없으므로 작업 인스턴스를 격리하는 것이 큰 요구 사항은 아닙니다.

    간단한 해결책은 Spring Profile 내부에서 작업을 구성하는 것입니다. 예를 들어 현재 구성이 다음과 같은 경우 :

    <beans>
      <bean id="someBean" .../>
    
      <task:scheduled-tasks>
        <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
      </task:scheduled-tasks>
    </beans>
    

    그것을 다음과 같이 변경하십시오 :

    <beans>
      <beans profile="scheduled">
        <bean id="someBean" .../>
    
        <task:scheduled-tasks>
          <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
        </task:scheduled-tasks>
      </beans>
    </beans>
    

    그런 다음 예약 된 프로필이 활성화 된 단일 시스템에서 응용 프로그램을 시작하십시오 (-Dspring.profiles.active = scheduled).

    어떤 이유로 주 서버를 사용할 수 없게 된 경우 프로필이 활성화 된 다른 서버를 시작하면 모든 작업이 계속 잘됩니다.

    작업에 자동 장애 조치를 원할 경우 상황이 변경됩니다. 그런 다음 모든 서버에서 작업을 계속 실행하고 데이터베이스 테이블, 클러스터 된 캐시, JMX 변수 등과 같은 공통 리소스를 통해 동기화를 확인해야합니다.

  2. ==============================

    2.이 목적을 정확히 수행하는 ShedLock 프로젝트가 있습니다. 실행시 잠글 수있는 작업에 주석을 달뿐입니다.

    이 목적을 정확히 수행하는 ShedLock 프로젝트가 있습니다. 실행시 잠글 수있는 작업에 주석을 달뿐입니다.

    @Scheduled( ... )
    @SchedulerLock(name = "scheduledTaskName")
    public void scheduledTask() {
       // do something
    }
    

    Spring과 LockProvider 구성 (SQL과 Mongo는 현재 지원됨)

    @Bean
    public TaskScheduler taskScheduler(LockProvider lockProvider) {
       int poolSize = 10;
       return SpringLockableTaskSchedulerFactory
                 .newLockableTaskScheduler(poolSize, lockProvider);
    }
    
  3. ==============================

    3.이 목적을 위해 JDBC-JobStore와 Quartz Clustering을 사용해야한다고 생각합니다.

    이 목적을 위해 JDBC-JobStore와 Quartz Clustering을 사용해야한다고 생각합니다.

  4. ==============================

    4.또한 클러스터에서 작업을 안전하게 실행할 수있는 간단하면서도 강력한 방법입니다. 노드가 클러스터의 "리더"인 경우에만 데이터베이스를 기반으로 작업을 실행할 수 있습니다.

    또한 클러스터에서 작업을 안전하게 실행할 수있는 간단하면서도 강력한 방법입니다. 노드가 클러스터의 "리더"인 경우에만 데이터베이스를 기반으로 작업을 실행할 수 있습니다.

    또한 클러스터에서 노드가 실패하거나 종료되면 다른 노드가 리더가됩니다.

    당신이 가진 것은 리더 선거 메커니즘을 만들고 매번 리더가 리더인지 확인하는 것입니다.

    @Scheduled(cron = "*/30 * * * * *")
    public void executeFailedEmailTasks() {
        if (checkIfLeader()) {
            final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
            for (EmailTask emailTask : list) {
                dispatchService.sendEmail(emailTask);
            }
        }
    }
    

    다음 단계를 따르십시오.

    1. 클러스터의 노드 당 하나의 항목을 보유하고있는 객체와 테이블을 정의하십시오.

    @Entity(name = "SYS_NODE")
    public class SystemNode {
    
    /** The id. */
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /** The name. */
    @Column(name = "TIMESTAMP")
    private String timestamp;
    
    /** The ip. */
    @Column(name = "IP")
    private String ip;
    
    /** The last ping. */
    @Column(name = "LAST_PING")
    private Date lastPing;
    
    /** The last ping. */
    @Column(name = "CREATED_AT")
    private Date createdAt = new Date();
    
    /** The last ping. */
    @Column(name = "IS_LEADER")
    private Boolean isLeader = Boolean.FALSE;
    
    public Long getId() {
        return id;
    }
    
    public void setId(final Long id) {
        this.id = id;
    }
    
    public String getTimestamp() {
        return timestamp;
    }
    
    public void setTimestamp(final String timestamp) {
        this.timestamp = timestamp;
    }
    
    public String getIp() {
        return ip;
    }
    
    public void setIp(final String ip) {
        this.ip = ip;
    }
    
    public Date getLastPing() {
        return lastPing;
    }
    
    public void setLastPing(final Date lastPing) {
        this.lastPing = lastPing;
    }
    
    public Date getCreatedAt() {
        return createdAt;
    }
    
    public void setCreatedAt(final Date createdAt) {
        this.createdAt = createdAt;
    }
    
    public Boolean getIsLeader() {
        return isLeader;
    }
    
    public void setIsLeader(final Boolean isLeader) {
        this.isLeader = isLeader;
    }
    
    @Override
    public String toString() {
        return "SystemNode{" +
                "id=" + id +
                ", timestamp='" + timestamp + '\'' +
                ", ip='" + ip + '\'' +
                ", lastPing=" + lastPing +
                ", createdAt=" + createdAt +
                ", isLeader=" + isLeader +
                '}';
    }
    

    }

    2. a) 데이터베이스에 노드를 삽입하고 b) 리더를 확인하는 서비스를 만든다.

    @Service
    @Transactional
    public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {
    
    /** The logger. */
    private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);
    
    /** The constant NO_ALIVE_NODES. */
    private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";
    
    /** The ip. */
    private String ip;
    
    /** The system service. */
    private SystemService systemService;
    
    /** The system node repository. */
    private SystemNodeRepository systemNodeRepository;
    
    @Autowired
    public void setSystemService(final SystemService systemService) {
        this.systemService = systemService;
    }
    
    @Autowired
    public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
        this.systemNodeRepository = systemNodeRepository;
    }
    
    @Override
    public void pingNode() {
        final SystemNode node = systemNodeRepository.findByIp(ip);
        if (node == null) {
            createNode();
        } else {
            updateNode(node);
        }
    }
    
    @Override
    public void checkLeaderShip() {
        final List<SystemNode> allList = systemNodeRepository.findAll();
        final List<SystemNode> aliveList = filterAliveNodes(allList);
    
        SystemNode leader = findLeader(allList);
        if (leader != null && aliveList.contains(leader)) {
            setLeaderFlag(allList, Boolean.FALSE);
            leader.setIsLeader(Boolean.TRUE);
            systemNodeRepository.save(allList);
        } else {
            final SystemNode node = findMinNode(aliveList);
    
            setLeaderFlag(allList, Boolean.FALSE);
            node.setIsLeader(Boolean.TRUE);
            systemNodeRepository.save(allList);
        }
    }
    
    /**
     * Returns the leaded
     * @param list
     *          the list
     * @return  the leader
     */
    private SystemNode findLeader(final List<SystemNode> list) {
        for (SystemNode systemNode : list) {
            if (systemNode.getIsLeader()) {
                return systemNode;
            }
        }
        return null;
    }
    
    @Override
    public boolean isLeader() {
        final SystemNode node = systemNodeRepository.findByIp(ip);
        return node != null && node.getIsLeader();
    }
    
    @Override
    public void onApplicationEvent(final ApplicationEvent applicationEvent) {
        try {
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (applicationEvent instanceof ContextRefreshedEvent) {
            pingNode();
        }
    }
    
    /**
     * Creates the node
     */
    private void createNode() {
        final SystemNode node = new SystemNode();
        node.setIp(ip);
        node.setTimestamp(String.valueOf(System.currentTimeMillis()));
        node.setCreatedAt(new Date());
        node.setLastPing(new Date());
        node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
        systemNodeRepository.save(node);
    }
    
    /**
     * Updates the node
     */
    private void updateNode(final SystemNode node) {
        node.setLastPing(new Date());
        systemNodeRepository.save(node);
    }
    
    /**
     * Returns the alive nodes.
     *
     * @param list
     *         the list
     * @return the alive nodes
     */
    private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
        int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
        final List<SystemNode> finalList = new LinkedList<>();
        for (SystemNode systemNode : list) {
            if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
                finalList.add(systemNode);
            }
        }
        if (CollectionUtils.isEmpty(finalList)) {
            LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
            throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
        }
        return finalList;
    }
    
    /**
     * Finds the min name node.
     *
     * @param list
     *         the list
     * @return the min node
     */
    private SystemNode findMinNode(final List<SystemNode> list) {
        SystemNode min = list.get(0);
        for (SystemNode systemNode : list) {
            if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
                min = systemNode;
            }
        }
        return min;
    }
    
    /**
     * Sets the leader flag.
     *
     * @param list
     *         the list
     * @param value
     *         the value
     */
    private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
        for (SystemNode systemNode : list) {
            systemNode.setIsLeader(value);
        }
    }
    

    }

    3.ping 데이터베이스가 당신이 살아있는 보내

    @Override
    @Scheduled(cron = "0 0/5 * * * ?")
    public void executeSystemNodePing() {
        systemNodeService.pingNode();
    }
    
    @Override
    @Scheduled(cron = "0 0/10 * * * ?")
    public void executeLeaderResolution() {
        systemNodeService.checkLeaderShip();
    }
    

    4. 준비가되었습니다! 작업을 실행하기 전에 리더인지 확인하십시오.

    @Override
    @Scheduled(cron = "*/30 * * * * *")
    public void executeFailedEmailTasks() {
        if (checkIfLeader()) {
            final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
            for (EmailTask emailTask : list) {
                dispatchService.sendEmail(emailTask);
            }
        }
    }
    
  5. ==============================

    5.잠금을 수행하기 위해 데이터베이스 테이블을 사용하고 있습니다. 한 번에 하나의 작업 만 테이블에 삽입 할 수 있습니다. 다른 하나는 DuplicateKeyException을 얻습니다. 삽입 및 삭제 로직은 @Scheduled 주석 주변의 한 측면으로 전달됩니다. 스프링 부트 2.0을 사용하고 있습니다.

    잠금을 수행하기 위해 데이터베이스 테이블을 사용하고 있습니다. 한 번에 하나의 작업 만 테이블에 삽입 할 수 있습니다. 다른 하나는 DuplicateKeyException을 얻습니다. 삽입 및 삭제 로직은 @Scheduled 주석 주변의 한 측면으로 전달됩니다. 스프링 부트 2.0을 사용하고 있습니다.

    @Component
    @Aspect
    public class SchedulerLock {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);
    
        @Autowired
        private JdbcTemplate jdbcTemplate;  
    
        @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
        public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {
    
            String jobSignature = joinPoint.getSignature().toString();
            try {
                jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});
    
                Object proceed = joinPoint.proceed();
    
                jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
                return proceed;
    
            }catch (DuplicateKeyException e) {
                LOGGER.warn("Job is currently locked: "+jobSignature);
                return null;
            }
        }
    }
    

    @Component
    public class EveryTenSecondJob {
    
        @Scheduled(cron = "0/10 * * * * *")
        public void taskExecution() {
            System.out.println("Hello World");
        }
    }
    

    CREATE TABLE scheduler_lock(
        signature varchar(255) NOT NULL,
        date datetime DEFAULT NULL,
        PRIMARY KEY(signature)
    );
    
  6. ==============================

    6.dlock은 데이터베이스 색인 및 제약 조건을 사용하여 한 번만 작업을 실행하도록 설계되었습니다. 다음과 같이 간단하게 할 수 있습니다.

    dlock은 데이터베이스 색인 및 제약 조건을 사용하여 한 번만 작업을 실행하도록 설계되었습니다. 다음과 같이 간단하게 할 수 있습니다.

    @Scheduled(cron = "30 30 3 * * *")
    @TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
    public void execute() {
    
    }
    

    그것을 사용하는 방법에 대한 기사를 참조하십시오.

  7. ==============================

    7.db-scheduler와 같은 임베디드 스케줄러를 사용하여이를 수행 할 수 있습니다. 그것은 영구적 인 실행을 가지며 단순한 낙관적 인 잠금 메커니즘을 사용하여 단일 노드의 실행을 보장합니다.

    db-scheduler와 같은 임베디드 스케줄러를 사용하여이를 수행 할 수 있습니다. 그것은 영구적 인 실행을 가지며 단순한 낙관적 인 잠금 메커니즘을 사용하여 단일 노드의 실행을 보장합니다.

    유스 케이스를 달성하는 방법에 대한 예제 코드 :

       RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
        .execute((taskInstance, executionContext) -> {
            System.out.println("Executing " + taskInstance.getTaskAndInstance());
        });
    
       final Scheduler scheduler = Scheduler
              .create(dataSource)
              .startTasks(recurring1)
              .build();
    
       scheduler.start();
    
  8. from https://stackoverflow.com/questions/31288810/spring-scheduled-task-running-in-clustered-environment by cc-by-sa and MIT license