AnalysisEvent.java
package com.bioproj.analysis.statemachine;
public enum AnalysisEvent {
// 运行分析数据
RUN_ANALYSIS_DATA,
// 分析结束
ANALYSIS_END
}
AnalysisPersist.java
package com.bioproj.analysis.statemachine;
import com.bioproj.analysis.entity.SeqSampleAnalysis;
import com.mbiolance.common.domain.SampleStatus;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.support.DefaultStateMachineContext;
public class AnalysisPersist implements StateMachinePersist<SampleStatus, AnalysisEvent, SeqSampleAnalysis> {
@Override
public void write(StateMachineContext<SampleStatus, AnalysisEvent> stateMachineContext, SeqSampleAnalysis seqSampleAnalysis) throws Exception {
}
@Override
public StateMachineContext<SampleStatus, AnalysisEvent> read(SeqSampleAnalysis seqSampleAnalysis) throws Exception {
return new DefaultStateMachineContext<>(seqSampleAnalysis.getStatus(),null,null,null);
}
}
AnalysisPersistConfig.java
package com.bioproj.analysis.statemachine;
import com.bioproj.analysis.entity.SeqSampleAnalysis;
import com.mbiolance.common.domain.SampleStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.persist.DefaultStateMachinePersister;
import org.springframework.statemachine.persist.StateMachinePersister;
@Configuration
public class AnalysisPersistConfig {
@Bean
public AnalysisPersist reportPersist() {
return new AnalysisPersist();
}
@Bean
public StateMachinePersister<SampleStatus, AnalysisEvent, SeqSampleAnalysis> stateMachinePersister() {
return new DefaultStateMachinePersister<>(reportPersist());
}
}
AnalysisStateMachineListener.java
package com.bioproj.analysis.statemachine;
import com.mbiolance.common.domain.SampleStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.listener.StateMachineListenerAdapter;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.transition.Transition;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AnalysisStateMachineListener extends StateMachineListenerAdapter<SampleStatus, AnalysisEvent> {
@Override
public void transition(Transition<SampleStatus, AnalysisEvent> transition) {
SampleStatus targetStatus = null;
SampleStatus sourceStatus = null;
State<SampleStatus, AnalysisEvent> target = transition.getTarget();
State<SampleStatus, AnalysisEvent> source = transition.getSource();
if (target != null) {
targetStatus = target.getId();
}
if (source != null) {
sourceStatus = source.getId();
}
log.info("分析状态变更,原状态:{},目标状态:{}", sourceStatus, targetStatus);
}
@Override
public void stateMachineError(StateMachine<SampleStatus, AnalysisEvent> stateMachine, Exception exception) {
System.out.println();
}
}
AnalysisStatusMachineConfig.java
package com.bioproj.analysis.statemachine;
import com.mbiolance.common.domain.SampleStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.config.EnableStateMachine;
import org.springframework.statemachine.config.StateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import java.util.EnumSet;
@Configuration
@EnableStateMachine
public class AnalysisStatusMachineConfig extends StateMachineConfigurerAdapter<SampleStatus, AnalysisEvent> {
@Autowired
private AnalysisStateMachineListener listener;
@Override
public void configure(StateMachineConfigurationConfigurer<SampleStatus, AnalysisEvent> config) throws Exception {
config.withConfiguration().listener(listener);
}
@Override
public void configure(StateMachineStateConfigurer<SampleStatus, AnalysisEvent> states) throws Exception {
states.withStates().initial(SampleStatus.CREATE)
.end(SampleStatus.ANALYSIS_END)
.states(EnumSet.allOf(SampleStatus.class));
}
@Override
public void configure(StateMachineTransitionConfigurer<SampleStatus, AnalysisEvent> transitions) throws Exception {
transitions.withExternal().source(SampleStatus.CREATE).target(SampleStatus.ANALYSIS_DOING)
.event(AnalysisEvent.RUN_ANALYSIS_DATA).and()
.withExternal().source(SampleStatus.ANALYSIS_DOING).target(SampleStatus.ANALYSIS_END)
.event(AnalysisEvent.ANALYSIS_END);
}
}
SeqSampleAnalysisStatusService.java
package com.bioproj.analysis.statemachine;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.bioproj.analysis.aop.RedisLock;
import com.bioproj.analysis.entity.SeqSampleAnalysis;
import com.bioproj.analysis.observer.AnalysisObserverHandles;
import com.bioproj.analysis.repository.SeqSampleAnalysisRepository;
import com.bioproj.analysis.service.AbstractSeqSampleAnalysisService;
import com.bioproj.controller.SendKafkaController;
import com.bioproj.domain.vo.SendKafkaVo;
import com.bioproj.pojo.task.Workflow;
import com.bioproj.service.IWorkflowService;
import com.mbiolance.cloud.auth.common.SysUserInfoContext;
import com.mbiolance.cloud.auth.common.SystemRuntimeException;
import com.mbiolance.cloud.sample.rpc.detection.SeqProjectSampleFeignService;
import com.mbiolance.common.domain.AnalysisRunMode;
import com.mbiolance.common.domain.SampleStatus;
import com.mbiolance.common.domain.SeqSampleAnalysisDto;
import com.mbiolance.common.platform.SeqSampleDataDto;
import com.mbiolance.common.rpc.CommonReportFeignService;
import com.mbiolance.common.utils.NumberGenerate;
import com.mbiolance.common.utils.SJBeanUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.annotation.OnTransition;
import org.springframework.statemachine.annotation.WithStateMachine;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
@Slf4j
@Component
@WithStateMachine
public class SeqSampleAnalysisStatusService extends AbstractSeqSampleAnalysisService<SeqSampleAnalysis> {
@Autowired
private StateMachine<SampleStatus, AnalysisEvent> stateMachine;
@Autowired
private StateMachinePersister<SampleStatus, AnalysisEvent, SeqSampleAnalysis> stateMachinePersister;
@Autowired
private CommonReportFeignService commonReportFeignService;
@Autowired
private SeqSampleAnalysisRepository seqSampleAnalysisRepository;
@Autowired
private AnalysisObserverHandles analysisObserverHandles;
@Autowired
private SendKafkaController sendKafkaController;
@OnTransition(source = "CREATE", target = "ANALYSIS_DOING")
public boolean runAnalysisDataTransition(Message<SeqSampleAnalysis> message) {
SeqSampleAnalysis seqSampleAnalysis = (SeqSampleAnalysis) message.getHeaders().get("seqSampleAnalysis");
if (seqSampleAnalysis == null || seqSampleAnalysis.getId() == null) {
return false;
}
Map<?, ?> objs = this.getObjs(message);
String clusterId = ObjectUtil.isEmpty(objs.get("clusterId")) ? null : String.valueOf(objs.get("clusterId"));
try {
log.info("添加到kafka:{},cluster:{}", seqSampleAnalysis.getAnalysisNumber(), clusterId);
sendKafkaController.runAnalysisData(SendKafkaVo.builder()
.workflowId(seqSampleAnalysis.getSxProcessId())
.analysisNumber(seqSampleAnalysis.getAnalysisNumber())
.experimentNumber(seqSampleAnalysis.getExperimentNumber())
.sampleNumber(seqSampleAnalysis.getSampleNumber())
.fastq1(seqSampleAnalysis.getFastq1())
.fastq2(seqSampleAnalysis.getFastq2())
.build());
} catch (Exception e) {
e.printStackTrace();
return false;
}
SeqSampleAnalysis analysis = seqSampleAnalysisRepository.findById(seqSampleAnalysis.getId()).orElseThrow(() -> new SystemRuntimeException("数据不存在"));
analysis.setStatus(SampleStatus.ANALYSIS_DOING);
analysis.setUpdateTime(new Date());
seqSampleAnalysisRepository.save(analysis);
analysisObserverHandles.notifyRunAnalysisData(analysis, SysUserInfoContext.getUser());
return true;
}
@OnTransition(source = "ANALYSIS_DOING", target = "ANALYSIS_END")
public boolean analysisEndTransition(Message<SeqSampleAnalysis> message) {
SeqSampleAnalysis seqSampleAnalysis = (SeqSampleAnalysis) message.getHeaders().get("seqSampleAnalysis");
if (seqSampleAnalysis == null || seqSampleAnalysis.getId() == null) {
return false;
}
SampleStatus originStatus = seqSampleAnalysis.getStatus();
Map<?, ?> objs = this.getObjs(message);
String templateNumber = ObjectUtil.isEmpty(objs.get("templateNumber")) ? null : String.valueOf(objs.get("templateNumber"));
SeqSampleAnalysisDto seqSampleAnalysisDto = BeanUtil.copyProperties(seqSampleAnalysis, SeqSampleAnalysisDto.class);
commonReportFeignService.createReport(seqSampleAnalysisDto, templateNumber);
seqSampleAnalysis.setStatus(SampleStatus.ANALYSIS_END);
seqSampleAnalysis.setUpdateTime(new Date());
seqSampleAnalysisRepository.saveAndFlush(seqSampleAnalysis);
analysisObserverHandles.notifyAnalysisEnd(seqSampleAnalysis, originStatus, SysUserInfoContext.getUser());
return true;
}
private Map<?, ?> getObjs(Message<SeqSampleAnalysis> message) {
Object obj = message.getHeaders().get("objs");
return Convert.convert(Map.class, obj);
}
@RedisLock(value = "'analysisEvent_' + #seqSampleAnalysis.analysisNumber", isBlocked = false)
public boolean sendEvent0(SeqSampleAnalysis seqSampleAnalysis, AnalysisEvent event, Map<String, Object> objs) {
Message<AnalysisEvent> message = MessageBuilder.withPayload(event)
.setHeader("seqSampleAnalysis", seqSampleAnalysis)
.setHeader("objs", objs).build();
boolean result = false;
try {
stateMachine.start();
stateMachinePersister.restore(stateMachine, seqSampleAnalysis);
result = stateMachine.sendEvent(message);
stateMachinePersister.persist(stateMachine, seqSampleAnalysis);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (Objects.nonNull(seqSampleAnalysis) && Objects.equals(seqSampleAnalysis.getStatus(), SampleStatus.ANALYSIS_END)) {
stateMachine.stop();
}
}
return result;
}
}