自定义类加载器导致的MetaspaceOOM

2019-10-19  本文已影响0人  todd5167
问题描述

在提交flinkStreamSQL时,抛出异常信息java.lang.OutOfMemoryError: Metaspace,(已设置-XX:MaxMetaspaceSize=400M 不让其动态增长)很明显元数据溢出,猜测是加载的类过多。通过获取堆转存快照发现,堆被大量DtClassLoader对象占据,这才意识到我们没有对DtClassLoader做相关缓存处理,导致每次提交任务都会创建出DtClassLoader对象,并由该对象加载我们的插件类。

使用ClassLoaderManager统一管理
@FunctionalInterface
public interface ClassLoaderSupplier<T> {
    /**
     * Gets a result.
     *
     * @return a result
     */
    T get(ClassLoader cl) throws Exception;
}
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();

public static <R> R newInstance(String pluginJarPath, ClassLoaderSupplier<R> supplier) throws Exception {
    ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
    return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
}
//  get or create 
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
    return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
        try {
            URL[] urls = PluginUtil.getPluginJarUrls(pluginJarPath);
            ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
            DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
            LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
            return classLoader;
        } catch (Throwable e) {
            LOG.error("retrieve ClassLoad happens error:{}", e);
            throw new RuntimeException("retrieve ClassLoad happens error");
        }
    });
}
public class ClassLoaderSupplierCallBack {
    public static <R> R callbackAndReset(ClassLoaderSupplier<R> supplier, ClassLoader toSetClassLoader) throws Exception {
        ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(toSetClassLoader);
        try {
            return supplier.get(toSetClassLoader);
        } finally {
            Thread.currentThread().setContextClassLoader(oldClassLoader);
        }
    }
}
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
    DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
    Thread.currentThread().setContextClassLoader(parentClassloader);
    
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
                                        JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    String pathOfType = String.format(PATH_FORMAT, sideType);
    String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
    DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
    PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
    String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
    return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
            .getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}

转换后

private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
                                        JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
    String pathOfType = String.format(PATH_FORMAT, sideType);
    String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
    String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
    return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
            cl.loadClass(className).asSubclass(AsyncReqRow.class)
                    .getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
                    .newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
} 
上一篇下一篇

猜你喜欢

热点阅读