[SPRING] 클러스터 환경에서 실행되는 Spring Scheduled Task
SPRING클러스터 환경에서 실행되는 Spring Scheduled Task
60 초마다 실행되는 cron 작업이있는 응용 프로그램을 작성하고 있습니다. 응용 프로그램은 여러 인스턴스에 필요할 때 확장되도록 구성됩니다. 나는 단지 60 초마다 1 인스턴스 (모든 노드에서)에서 작업을 실행하기를 원합니다. 상자에서 벗어나서 나는 이것에 대한 해결책을 찾을 수 없으며 전에 여러 번 물어 본 적이 없다는 사실에 놀랐습니다. Spring 4.1.6을 사용하고 있습니다.
<task:scheduled-tasks>
<task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
모든 지침은 크게 감사하겠습니다. 감사
해결법
-
==============================
1.일괄 작업과 예약 된 작업은 대개 고객 용 응용 프로그램이 아닌 독립 실행 형 서버에서 실행되므로 클러스터에서 실행될 것으로 예상되는 응용 프로그램에 작업을 포함하는 것이 일반적 요구 사항은 아닙니다. 또한 클러스터 된 환경의 작업은 일반적으로 병렬로 실행되는 동일한 작업의 다른 인스턴스에 대해 걱정할 필요가 없으므로 작업 인스턴스를 격리하는 것이 큰 요구 사항은 아닙니다.
일괄 작업과 예약 된 작업은 대개 고객 용 응용 프로그램이 아닌 독립 실행 형 서버에서 실행되므로 클러스터에서 실행될 것으로 예상되는 응용 프로그램에 작업을 포함하는 것이 일반적 요구 사항은 아닙니다. 또한 클러스터 된 환경의 작업은 일반적으로 병렬로 실행되는 동일한 작업의 다른 인스턴스에 대해 걱정할 필요가 없으므로 작업 인스턴스를 격리하는 것이 큰 요구 사항은 아닙니다.
간단한 해결책은 Spring Profile 내부에서 작업을 구성하는 것입니다. 예를 들어 현재 구성이 다음과 같은 경우 :
<beans> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans>
그것을 다음과 같이 변경하십시오 :
<beans> <beans profile="scheduled"> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans> </beans>
그런 다음 예약 된 프로필이 활성화 된 단일 시스템에서 응용 프로그램을 시작하십시오 (-Dspring.profiles.active = scheduled).
어떤 이유로 주 서버를 사용할 수 없게 된 경우 프로필이 활성화 된 다른 서버를 시작하면 모든 작업이 계속 잘됩니다.
작업에 자동 장애 조치를 원할 경우 상황이 변경됩니다. 그런 다음 모든 서버에서 작업을 계속 실행하고 데이터베이스 테이블, 클러스터 된 캐시, JMX 변수 등과 같은 공통 리소스를 통해 동기화를 확인해야합니다.
-
==============================
2.이 목적을 정확히 수행하는 ShedLock 프로젝트가 있습니다. 실행시 잠글 수있는 작업에 주석을 달뿐입니다.
이 목적을 정확히 수행하는 ShedLock 프로젝트가 있습니다. 실행시 잠글 수있는 작업에 주석을 달뿐입니다.
@Scheduled( ... ) @SchedulerLock(name = "scheduledTaskName") public void scheduledTask() { // do something }
Spring과 LockProvider 구성 (SQL과 Mongo는 현재 지원됨)
@Bean public TaskScheduler taskScheduler(LockProvider lockProvider) { int poolSize = 10; return SpringLockableTaskSchedulerFactory .newLockableTaskScheduler(poolSize, lockProvider); }
-
==============================
3.이 목적을 위해 JDBC-JobStore와 Quartz Clustering을 사용해야한다고 생각합니다.
이 목적을 위해 JDBC-JobStore와 Quartz Clustering을 사용해야한다고 생각합니다.
-
==============================
4.또한 클러스터에서 작업을 안전하게 실행할 수있는 간단하면서도 강력한 방법입니다. 노드가 클러스터의 "리더"인 경우에만 데이터베이스를 기반으로 작업을 실행할 수 있습니다.
또한 클러스터에서 작업을 안전하게 실행할 수있는 간단하면서도 강력한 방법입니다. 노드가 클러스터의 "리더"인 경우에만 데이터베이스를 기반으로 작업을 실행할 수 있습니다.
또한 클러스터에서 노드가 실패하거나 종료되면 다른 노드가 리더가됩니다.
당신이 가진 것은 리더 선거 메커니즘을 만들고 매번 리더가 리더인지 확인하는 것입니다.
@Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
다음 단계를 따르십시오.
1. 클러스터의 노드 당 하나의 항목을 보유하고있는 객체와 테이블을 정의하십시오.
@Entity(name = "SYS_NODE") public class SystemNode { /** The id. */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** The name. */ @Column(name = "TIMESTAMP") private String timestamp; /** The ip. */ @Column(name = "IP") private String ip; /** The last ping. */ @Column(name = "LAST_PING") private Date lastPing; /** The last ping. */ @Column(name = "CREATED_AT") private Date createdAt = new Date(); /** The last ping. */ @Column(name = "IS_LEADER") private Boolean isLeader = Boolean.FALSE; public Long getId() { return id; } public void setId(final Long id) { this.id = id; } public String getTimestamp() { return timestamp; } public void setTimestamp(final String timestamp) { this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(final String ip) { this.ip = ip; } public Date getLastPing() { return lastPing; } public void setLastPing(final Date lastPing) { this.lastPing = lastPing; } public Date getCreatedAt() { return createdAt; } public void setCreatedAt(final Date createdAt) { this.createdAt = createdAt; } public Boolean getIsLeader() { return isLeader; } public void setIsLeader(final Boolean isLeader) { this.isLeader = isLeader; } @Override public String toString() { return "SystemNode{" + "id=" + id + ", timestamp='" + timestamp + '\'' + ", ip='" + ip + '\'' + ", lastPing=" + lastPing + ", createdAt=" + createdAt + ", isLeader=" + isLeader + '}'; }
}
2. a) 데이터베이스에 노드를 삽입하고 b) 리더를 확인하는 서비스를 만든다.
@Service @Transactional public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener { /** The logger. */ private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class); /** The constant NO_ALIVE_NODES. */ private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}"; /** The ip. */ private String ip; /** The system service. */ private SystemService systemService; /** The system node repository. */ private SystemNodeRepository systemNodeRepository; @Autowired public void setSystemService(final SystemService systemService) { this.systemService = systemService; } @Autowired public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) { this.systemNodeRepository = systemNodeRepository; } @Override public void pingNode() { final SystemNode node = systemNodeRepository.findByIp(ip); if (node == null) { createNode(); } else { updateNode(node); } } @Override public void checkLeaderShip() { final List<SystemNode> allList = systemNodeRepository.findAll(); final List<SystemNode> aliveList = filterAliveNodes(allList); SystemNode leader = findLeader(allList); if (leader != null && aliveList.contains(leader)) { setLeaderFlag(allList, Boolean.FALSE); leader.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } else { final SystemNode node = findMinNode(aliveList); setLeaderFlag(allList, Boolean.FALSE); node.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } } /** * Returns the leaded * @param list * the list * @return the leader */ private SystemNode findLeader(final List<SystemNode> list) { for (SystemNode systemNode : list) { if (systemNode.getIsLeader()) { return systemNode; } } return null; } @Override public boolean isLeader() { final SystemNode node = systemNodeRepository.findByIp(ip); return node != null && node.getIsLeader(); } @Override public void onApplicationEvent(final ApplicationEvent applicationEvent) { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { throw new RuntimeException(e); } if (applicationEvent instanceof ContextRefreshedEvent) { pingNode(); } } /** * Creates the node */ private void createNode() { final SystemNode node = new SystemNode(); node.setIp(ip); node.setTimestamp(String.valueOf(System.currentTimeMillis())); node.setCreatedAt(new Date()); node.setLastPing(new Date()); node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll())); systemNodeRepository.save(node); } /** * Updates the node */ private void updateNode(final SystemNode node) { node.setLastPing(new Date()); systemNodeRepository.save(node); } /** * Returns the alive nodes. * * @param list * the list * @return the alive nodes */ private List<SystemNode> filterAliveNodes(final List<SystemNode> list) { int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class); final List<SystemNode> finalList = new LinkedList<>(); for (SystemNode systemNode : list) { if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) { finalList.add(systemNode); } } if (CollectionUtils.isEmpty(finalList)) { LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list)); throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list)); } return finalList; } /** * Finds the min name node. * * @param list * the list * @return the min node */ private SystemNode findMinNode(final List<SystemNode> list) { SystemNode min = list.get(0); for (SystemNode systemNode : list) { if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) { min = systemNode; } } return min; } /** * Sets the leader flag. * * @param list * the list * @param value * the value */ private void setLeaderFlag(final List<SystemNode> list, final Boolean value) { for (SystemNode systemNode : list) { systemNode.setIsLeader(value); } }
}
3.ping 데이터베이스가 당신이 살아있는 보내
@Override @Scheduled(cron = "0 0/5 * * * ?") public void executeSystemNodePing() { systemNodeService.pingNode(); } @Override @Scheduled(cron = "0 0/10 * * * ?") public void executeLeaderResolution() { systemNodeService.checkLeaderShip(); }
4. 준비가되었습니다! 작업을 실행하기 전에 리더인지 확인하십시오.
@Override @Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
-
==============================
5.잠금을 수행하기 위해 데이터베이스 테이블을 사용하고 있습니다. 한 번에 하나의 작업 만 테이블에 삽입 할 수 있습니다. 다른 하나는 DuplicateKeyException을 얻습니다. 삽입 및 삭제 로직은 @Scheduled 주석 주변의 한 측면으로 전달됩니다. 스프링 부트 2.0을 사용하고 있습니다.
잠금을 수행하기 위해 데이터베이스 테이블을 사용하고 있습니다. 한 번에 하나의 작업 만 테이블에 삽입 할 수 있습니다. 다른 하나는 DuplicateKeyException을 얻습니다. 삽입 및 삭제 로직은 @Scheduled 주석 주변의 한 측면으로 전달됩니다. 스프링 부트 2.0을 사용하고 있습니다.
@Component @Aspect public class SchedulerLock { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class); @Autowired private JdbcTemplate jdbcTemplate; @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))") public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable { String jobSignature = joinPoint.getSignature().toString(); try { jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()}); Object proceed = joinPoint.proceed(); jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature}); return proceed; }catch (DuplicateKeyException e) { LOGGER.warn("Job is currently locked: "+jobSignature); return null; } } }
@Component public class EveryTenSecondJob { @Scheduled(cron = "0/10 * * * * *") public void taskExecution() { System.out.println("Hello World"); } }
CREATE TABLE scheduler_lock( signature varchar(255) NOT NULL, date datetime DEFAULT NULL, PRIMARY KEY(signature) );
-
==============================
6.dlock은 데이터베이스 색인 및 제약 조건을 사용하여 한 번만 작업을 실행하도록 설계되었습니다. 다음과 같이 간단하게 할 수 있습니다.
dlock은 데이터베이스 색인 및 제약 조건을 사용하여 한 번만 작업을 실행하도록 설계되었습니다. 다음과 같이 간단하게 할 수 있습니다.
@Scheduled(cron = "30 30 3 * * *") @TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES) public void execute() { }
그것을 사용하는 방법에 대한 기사를 참조하십시오.
-
==============================
7.db-scheduler와 같은 임베디드 스케줄러를 사용하여이를 수행 할 수 있습니다. 그것은 영구적 인 실행을 가지며 단순한 낙관적 인 잠금 메커니즘을 사용하여 단일 노드의 실행을 보장합니다.
db-scheduler와 같은 임베디드 스케줄러를 사용하여이를 수행 할 수 있습니다. 그것은 영구적 인 실행을 가지며 단순한 낙관적 인 잠금 메커니즘을 사용하여 단일 노드의 실행을 보장합니다.
유스 케이스를 달성하는 방법에 대한 예제 코드 :
RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60))) .execute((taskInstance, executionContext) -> { System.out.println("Executing " + taskInstance.getTaskAndInstance()); }); final Scheduler scheduler = Scheduler .create(dataSource) .startTasks(recurring1) .build(); scheduler.start();
from https://stackoverflow.com/questions/31288810/spring-scheduled-task-running-in-clustered-environment by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] 내 웹 애플리케이션에 (스프링 보안을 통해) 로그인 한 모든 사용자 목록을 어떻게 표시 할 수 있습니까? (0) | 2018.12.16 |
---|---|
[SPRING] 스프링 기반 SockJS / STOMP 웹 소켓이있는 JSON 웹 토큰 (JWT) (0) | 2018.12.16 |
[SPRING] 스프링 MVC - 날짜 필드 바인딩 (0) | 2018.12.16 |
[SPRING] 다른 익명의 CacheManager는 이미 동일한 VM에 존재합니다 (ehCache 2.5). (0) | 2018.12.16 |
[SPRING] @Transactional 어노테이션은 인터페이스 정의 나 구현 클래스에서 어디에 넣어야합니까? (0) | 2018.12.16 |