网站首页 > 技术文章 正文
序
本文主要研究一下PowerJob的ServerDeployContainerRequest
ServerDeployContainerRequest
tech/powerjob/common/request/ServerDeployContainerRequest.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerDeployContainerRequest implements PowerSerializable {
/**
* 容器ID
*/
private Long containerId;
/**
* 容器名称
*/
private String containerName;
/**
* 文件名(MD5值),用于做版本校验和文件下载
*/
private String version;
/**
* 下载地址
*/
private String downloadURL;
}
ServerDeployContainerRequest定义了containerId、containerName、version、downloadURL属性
onReceiveServerDeployContainerRequest
tech/powerjob/worker/actors/WorkerActor.java
@Handler(path = WORKER_HANDLER_DEPLOY_CONTAINER)
public void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {
OmsContainerFactory.deployContainer(request);
}
WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer
deployContainer
tech/powerjob/worker/container/OmsContainerFactory.java
public static synchronized void deployContainer(ServerDeployContainerRequest request) {
Long containerId = request.getContainerId();
String containerName = request.getContainerName();
String version = request.getVersion();
log.info("[OmsContainer-{}] start to deploy container(name={},version={},downloadUrl={})", containerId, containerName, version, request.getDownloadURL());
OmsContainer oldContainer = CARGO.get(containerId);
if (oldContainer != null && version.equals(oldContainer.getVersion())) {
log.info("[OmsContainer-{}] version={} already deployed, so skip this deploy task.", containerId, version);
return;
}
String filePath = CONTAINER_DIR + containerId + "/" + version + ".jar";
// 下载Container到本地
File jarFile = new File(filePath);
try {
if (!jarFile.exists()) {
FileUtils.forceMkdirParent(jarFile);
FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath());
}
// 创建新容器
OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile);
newContainer.init();
// 替换容器
CARGO.put(containerId, newContainer);
log.info("[OmsContainer-{}] deployed new version:{} successfully!", containerId, version);
if (oldContainer != null) {
// 销毁旧容器
oldContainer.destroy();
}
} catch (Exception e) {
log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);
// 如果部署失败,则删除该 jar(本次失败可能是下载jar出错导致,不删除会导致这个版本永久无法重新部署)
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));
}
}
deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法
OmsContainer
tech/powerjob/worker/container/OmsContainer.java
public interface OmsContainer extends LifeCycle {
/**
* 获取处理器
* @param className 全限定类名
* @return 处理器(可以是 MR、BD等处理器)
*/
BasicProcessor getProcessor(String className);
/**
* 获取容器的类加载器
* @return 类加载器
*/
OhMyClassLoader getContainerClassLoader();
Long getContainerId();
Long getDeployedTime();
String getName();
String getVersion();
/**
* 尝试释放容器资源
*/
void tryRelease();
}
OmsContainer接口定义了getProcessor、getContainerClassLoader、getContainerId、getDeployedTime、getName、getVersion、tryRelease方法
OmsJarContainer
tech/powerjob/worker/container/OmsJarContainer.java
@Slf4j
public class OmsJarContainer implements OmsContainer {
private final Long containerId;
private final String name;
private final String version;
private final File localJarFile;
private final Long deployedTime;
// 引用计数器
private final AtomicInteger referenceCount = new AtomicInteger(0);
private OhMyClassLoader containerClassLoader;
private ClassPathXmlApplicationContext container;
private final Map<String, BasicProcessor> processorCache = Maps.newConcurrentMap();
public OmsJarContainer(Long containerId, String name, String version, File localJarFile) {
this.containerId = containerId;
this.name = name;
this.version = version;
this.localJarFile = localJarFile;
this.deployedTime = System.currentTimeMillis();
}
//......
}
OmsJarContainer实现了OmsContainer接口
getProcessor
public BasicProcessor getProcessor(String className) {
BasicProcessor basicProcessor = processorCache.computeIfAbsent(className, ignore -> {
Class<?> targetClass;
try {
targetClass = containerClassLoader.loadClass(className);
} catch (ClassNotFoundException cnf) {
log.error("[OmsJarContainer-{}] can't find class: {} in container.", containerId, className);
return null;
}
// 先尝试从 Spring IOC 容器加载
try {
return (BasicProcessor) container.getBean(targetClass);
} catch (BeansException be) {
log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", containerId);
} catch (ClassCastException cce) {
log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", containerId, className);
return null;
} catch (Exception e) {
log.error("[OmsJarContainer-{}] get bean failed for {}.", containerId, className, e);
return null;
}
// 直接实例化
try {
Object obj = targetClass.getDeclaredConstructor().newInstance();
return (BasicProcessor) obj;
} catch (Exception e) {
log.error("[OmsJarContainer-{}] load {} failed", containerId, className, e);
}
return null;
});
if (basicProcessor != null) {
// 引用计数 + 1
referenceCount.getAndIncrement();
}
return basicProcessor;
}
getProcessor方法会先通过containerClassLoader.loadClass去加载对应的processor类,加载不到则直接返回,之后根据targetClass去spring容器查找,若查找不到则直接通过targetClass.getDeclaredConstructor().newInstance()尝试实例化
init
public void init() throws Exception {
log.info("[OmsJarContainer-{}] start to init container(name={},jarPath={})", containerId, name, localJarFile.getPath());
URL jarURL = localJarFile.toURI().toURL();
// 创建类加载器(父类加载为 Worker 的类加载)
this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());
// 解析 Properties
Properties properties = new Properties();
try (InputStream propertiesURLStream = containerClassLoader.getResourceAsStream(ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME)) {
if (propertiesURLStream == null) {
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
}
properties.load(propertiesURLStream);
log.info("[OmsJarContainer-{}] load container properties successfully: {}", containerId, properties);
}
String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
if (StringUtils.isEmpty(packageName)) {
log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
throw new PowerJobException("invalid jar file");
}
// 加载用户类
containerClassLoader.load(packageName);
// 创建 Spring IOC 容器(Spring配置文件需要填相对路径)
// 需要切换线程上下文类加载器以加载 JDBC 类驱动(SPI)
ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(containerClassLoader);
try {
this.container = new ClassPathXmlApplicationContext(new String[]{ContainerConstant.SPRING_CONTEXT_FILE_NAME}, false);
this.container.setClassLoader(containerClassLoader);
this.container.refresh();
}finally {
Thread.currentThread().setContextClassLoader(oldCL);
}
log.info("[OmsJarContainer-{}] init container(name={},jarPath={}) successfully", containerId, name, localJarFile.getPath());
}
init方法根据jar包地址创建OhMyClassLoader,然后先解析oms-worker-container.properties,执行properties.load(propertiesURLStream),接着获取配置的packageName,执行containerClassLoader.load(packageName)加载类,然后根据oms-worker-container-spring-context.xml创建spring的ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法
destroy
public void destroy() throws Exception {
// 没有其余引用时,才允许执行 destroy
if (referenceCount.get() <= 0) {
try {
if (localJarFile.exists()) {
FileUtils.forceDelete(localJarFile);
}
}catch (Exception e) {
log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", containerId, localJarFile.getPath(), e);
}
try {
processorCache.clear();
container.close();
containerClassLoader.close();
log.info("[OmsJarContainer-{}] container destroyed successfully", containerId);
}catch (Exception e) {
log.error("[OmsJarContainer-{}] container destroyed failed", containerId, e);
}
return;
}
log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", containerId, referenceCount.get());
}
destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close
JarContainerProcessorFactory
tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java
@Slf4j
public class JarContainerProcessorFactory implements ProcessorFactory {
private final WorkerRuntime workerRuntime;
public JarContainerProcessorFactory(WorkerRuntime workerRuntime) {
this.workerRuntime = workerRuntime;
}
@Override
public Set<String> supportTypes() {
return Sets.newHashSet(ProcessorType.EXTERNAL.name());
}
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
String processorInfo = processorDefinition.getProcessorInfo();
String[] split = processorInfo.split("#");
String containerName = split[0];
String className = split[1];
log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName);
OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime);
if (omsContainer != null) {
return new ProcessorBean()
.setProcessor(omsContainer.getProcessor(className))
.setClassLoader(omsContainer.getContainerClassLoader());
} else {
log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo);
}
return null;
}
}
JarContainerProcessorFactory的build方法它根据#来解析出containerId及className,然后通过OmsContainerFactory.fetchContainer去查找容器,然后通过omsContainer.getProcessor(className)获取对应的processor;JarContainerProcessorFactory的supportTypes为EXTERNAL(外部处理器(动态加载))
小结
WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer;deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法;init方法根据jar包地址创建OhMyClassLoader,创建ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法;destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close。
猜你喜欢
- 2024-09-15 Maven把项目依赖的所有jar包都打到同一个jar中
- 2024-09-15 SpringBoot打包部署解析:jar包的生成和结构
- 1512℃桌面软件开发新体验!用 Blazor Hybrid 打造简洁高效的视频处理工具
- 556℃Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 504℃MySQL service启动脚本浅析(r12笔记第59天)
- 482℃服务器异常重启,导致mysql启动失败,问题解决过程记录
- 481℃启用MySQL查询缓存(mysql8.0查询缓存)
- 461℃「赵强老师」MySQL的闪回(赵强iso是哪个大学毕业的)
- 441℃mysql服务怎么启动和关闭?(mysql服务怎么启动和关闭)
- 438℃MySQL server PID file could not be found!失败
- 最近发表
- 标签列表
-
- c++中::是什么意思 (83)
- 标签用于 (65)
- 主键只能有一个吗 (66)
- c#console.writeline不显示 (75)
- pythoncase语句 (81)
- es6includes (73)
- windowsscripthost (67)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- js判断是否是json字符串 (67)
- checkout-b (67)
- c语言min函数头文件 (68)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- & (66)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- eacces (67)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)