当前位置: 首页 > news >正文

设计师关注的十大网站最美情侣免费播放视频大全

设计师关注的十大网站,最美情侣免费播放视频大全,下载学校网站模板下载安装,国家建设执业注册中心网站系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 八、DataX源码分析-插件机制 文章目录 系列文章…系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 八、DataX源码分析-插件机制 文章目录 系列文章目录前言一、插件分类插件目录结构插件加载原理 前言 DataX的插件机制是其核心特性之一它使得DataX能够灵活地适应各种不同的数据源的数据同步。这一机制主要基于插件开发框架该框架主要包括Reader插件、Transformer插件、Writer插件。 DataX的插件机制还采用了框架插件的架构。框架负责连接Reader和Writer插件作为两者的数据传输通道并处理缓冲、流控、并发、数据转换等核心技术问题。这种架构使得插件只需关心数据的读取或写入本身而同步的共性问题则由框架来处理。 此外DataX的插件机制还具有良好的扩展性和可维护性。开发者可以根据需要开发新的Reader或Writer插件来支持新的数据源类型而无需修改DataX的核心框架代码。这种插件化的设计使得DataX能够适应不断变化的业务需求和技术环境。 在插件的加载和初始化方面DataX使用了类似Java SPIService Provider Interface的机制。它会在指定的插件目录中查找并加载插件然后将其注册到插件注册中心。这样当需要使用某个插件时就可以从注册中心中获取其实例并进行相应的操作。 总的来说DataX的插件机制是一种非常灵活和可扩展的设计它使得DataX能够适应各种不同的数据源和数据存储需求同时也为开发者提供了丰富的扩展和定制化的可能性。 一、插件分类 按照功能分 reader 读插件例如mysqlReader从mysql读取数据 writer 写插件。例如mysqlWriter给mysql写入数据 transformer 中间结果转换例如SubstrTransformer用于字符截取 按照运行类型分 Job级别的插件 Task级别的插件 插件目录结构 datax\plugin下分2个reader和writer目录下面以mysql为例 plugin.json内容 {name: mysqlreader,class: xxx.plugin.reader.mysqlreader.MysqlReader,description: useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.,developer: xx }插件加载原理 DataX进程启动入口为com.alibaba.datax.core.Engineengine.entry() public static void entry(final String[] args) throws Throwable {Options options new Options();options.addOption(job, true, Job config.);options.addOption(jobid, true, Job unique id.);options.addOption(mode, true, Job runtime mode.);BasicParser parser new BasicParser();CommandLine cl parser.parse(options, args);String jobPath cl.getOptionValue(job);// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1String jobIdString cl.getOptionValue(jobid);RUNTIME_MODE cl.getOptionValue(mode);Configuration configuration ConfigParser.parse(jobPath);}读取并解析插件配置 ConfigParser.parse(final String jobPath)传入job路径该方法组装解析最后返回一个Configuration对象Configuration里解析出了readerwriterhandler等插件名称提取完插件名称后会去reader目录和writer目录寻找插件的位置。动态加载插件 插件的加载都是通过自定义类加载器JarLoader动态加载,提供插件相关Jar隔离的加载机制。插件的加载接口由LoadUtil类负责当要加载一个插件时需要实例化一个JarLoader然后切换thread class loader之后才加载插件。这个主要由ClassLoaderSwapper实现。JarLoader类 JarLoader 负责加载指定路径下的插件 JAR 文件。它会检查 JAR 文件的合法性、有效性以及是否包含必要的插件实现类。继承自URLClassLoader提供Jar隔离的加载机制会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。 /*** 提供Jar隔离的加载机制会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。*/ public class JarLoader extends URLClassLoader{public JarLoader(String[] paths) {this(paths, JarLoader.class.getClassLoader());}public JarLoader(String[] paths, ClassLoader parent) {super(getURLs(paths), parent);}private static URL[] getURLs(String[] paths) {Validate.isTrue(null ! paths 0 ! paths.length,jar包路径不能为空.);ListString dirs new ArrayListString();for (String path : paths) {dirs.add(path);JarLoader.collectDirs(path, dirs);}ListURL urls new ArrayListURL();for (String path : dirs) {urls.addAll(doGetURLs(path));}return urls.toArray(new URL[0]);}private static void collectDirs(String path, ListString collector) {if (null path || StringUtils.isBlank(path)) {return;}File current new File(path);if (!current.exists() || !current.isDirectory()) {return;}for (File child : current.listFiles()) {if (!child.isDirectory()) {continue;}collector.add(child.getAbsolutePath());collectDirs(child.getAbsolutePath(), collector);}}private static ListURL doGetURLs(final String path) {Validate.isTrue(!StringUtils.isBlank(path), jar包路径不能为空.);File jarPath new File(path);Validate.isTrue(jarPath.exists() jarPath.isDirectory(),jar包路径必须存在且为目录.);/* set filter */FileFilter jarFilter new FileFilter() {Overridepublic boolean accept(File pathname) {return pathname.getName().endsWith(.jar);}};/* iterate all jar */File[] allJars new File(path).listFiles(jarFilter);ListURL jarURLs new ArrayListURL(allJars.length);for (int i 0; i allJars.length; i) {try {jarURLs.add(allJars[i].toURI().toURL());} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR,系统加载jar包出错, e);}}return jarURLs;} } LoadUtil LoadUtil 是一个工具类用于辅助插件的加载和初始化过程。LoadUtil 类通常包含静态方法这些方法简化了插件加载的逻辑使得 DataX 的核心框架能够与具体的插件进行交互。 LoadUtil 的主要职责包括 插件加载LoadUtil 提供了加载插件的方法。这些方法会根据配置文件中指定的插件类型和名称使用 Java 的反射机制来加载插件的类定义。加载过程可能包括查找类路径下的 JAR 文件、读取插件的元数据以及验证插件的合法性。 插件实例化一旦插件类被加载LoadUtil 会负责创建插件的实例。这通常涉及到调用插件类的无参构造函数并返回该实例的引用。LoadUtil 会处理任何与实例化相关的异常以确保在出现问题时能够给出适当的错误消息。 插件注册加载并实例化插件后LoadUtil 可能会将插件实例注册到一个全局的插件注册中心。这样DataX 的其他部分就可以在需要时获取并使用这些插件实例。 配置传递LoadUtil 还可能负责将配置文件中针对插件的配置参数传递给插件实例。这确保了插件能够根据用户的配置进行正确的初始化。 错误处理如果在加载、实例化或配置插件过程中发生错误LoadUtil 会负责处理这些错误。这可能包括记录日志、抛出异常或采取其他恢复措施。 public class LoadUtil {private static final String pluginTypeNameFormat plugin.%s.%s;private LoadUtil() {}private enum ContainerType {Job(Job), Task(Task);private String type;private ContainerType(String type) {this.type type;}public String value() {return type;}}/*** 所有插件配置放置在pluginRegisterCenter中为区别reader、transformer和writer还能区别* 具体pluginName故使用pluginType.pluginName作为key放置在该map中*/private static Configuration pluginRegisterCenter;/*** jarLoader的缓冲*/private static MapString, JarLoader jarLoaderCenter new HashMap();/*** 设置pluginConfigs方便后面插件来获取** param pluginConfigs*/public static void bind(Configuration pluginConfigs) {pluginRegisterCenter pluginConfigs;}private static String generatePluginKey(PluginType pluginType,String pluginName) {return String.format(pluginTypeNameFormat, pluginType.toString(),pluginName);}private static Configuration getPluginConf(PluginType pluginType,String pluginName) {Configuration pluginConf pluginRegisterCenter.getConfiguration(generatePluginKey(pluginType, pluginName));if (null pluginConf) {throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INSTALL_ERROR,String.format(DataX不能找到插件[%s]的配置.,pluginName));}return pluginConf;}/*** 加载JobPluginreader、writer都可能要加载** param pluginType* param pluginName* return*/public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,String pluginName) {Class? extends AbstractPlugin clazz LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);try {AbstractJobPlugin jobPlugin (AbstractJobPlugin) clazz.newInstance();jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));return jobPlugin;} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format(DataX找到plugin[%s]的Job配置.,pluginName), e);}}/*** 加载taskPluginreader、writer都可能加载** param pluginType* param pluginName* return*/public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType,String pluginName) {Class? extends AbstractPlugin clazz LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);try {AbstractTaskPlugin taskPlugin (AbstractTaskPlugin) clazz.newInstance();taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));return taskPlugin;} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format(DataX不能找plugin[%s]的Task配置.,pluginName), e);}}/*** 根据插件类型、名字和执行时taskGroupId加载对应运行器** param pluginType* param pluginName* return*/public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {AbstractTaskPlugin taskPlugin LoadUtil.loadTaskPlugin(pluginType,pluginName);switch (pluginType) {case READER:return new ReaderRunner(taskPlugin);case WRITER:return new WriterRunner(taskPlugin);default:throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format(插件[%s]的类型必须是[reader]或[writer]!,pluginName));}}/*** 反射出具体plugin实例** param pluginType* param pluginName* param pluginRunType* return*/SuppressWarnings(unchecked)private static synchronized Class? extends AbstractPlugin loadPluginClass(PluginType pluginType, String pluginName,ContainerType pluginRunType) {Configuration pluginConf getPluginConf(pluginType, pluginName);JarLoader jarLoader LoadUtil.getJarLoader(pluginType, pluginName);try {return (Class? extends AbstractPlugin) jarLoader.loadClass(pluginConf.getString(class) $ pluginRunType.value());} catch (Exception e) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);}}public static synchronized JarLoader getJarLoader(PluginType pluginType,String pluginName) {Configuration pluginConf getPluginConf(pluginType, pluginName);JarLoader jarLoader jarLoaderCenter.get(generatePluginKey(pluginType,pluginName));if (null jarLoader) {String pluginPath pluginConf.getString(path);if (StringUtils.isBlank(pluginPath)) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,String.format(%s插件[%s]路径非法!,pluginType, pluginName));}jarLoader new JarLoader(new String[]{pluginPath});jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),jarLoader);}return jarLoader;} }ClassLoaderSwapper ClassLoaderSwapper有一个属性storeClassLoader 用于保存着当前线程的classLoader切换之前的ClassLoader。 /*** 为避免jar冲突比如hbase可能有多个版本的读写依赖jar包JobContainer和TaskGroupContainer就需要脱离当前classLoader去加载这些jar包执行完成后又退回到原来classLoader上继续执行接下来的代码*/ public final class ClassLoaderSwapper {private ClassLoader storeClassLoader null;private ClassLoaderSwapper() {}public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {return new ClassLoaderSwapper();}/*** 保存当前classLoader并将当前线程的classLoader设置为所给classLoader** param* return*/public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {this.storeClassLoader Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(classLoader);return this.storeClassLoader;}/*** 将当前线程的类加载器设置为保存的类加载* return*/public ClassLoader restoreCurrentThreadClassLoader() {ClassLoader classLoader Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(this.storeClassLoader);return classLoader;} }
http://www.pierceye.com/news/439837/

相关文章:

  • 同一产品做多个网站山西省住房和城乡建设厅官网
  • 网站建设的流程是什么意思微信小程序的代码
  • 广州网站整站优化html项目案例实战
  • 宁波网站推广方式seo优化按天扣费
  • 紫金优化网站制作python编程100例
  • 原阳网站建设哪家好域名网址
  • 西安学校网站建设wordpress手机端模板下载
  • 泉州网站建设工作室网站上的产品板块
  • 平顶山网站网站建设网页设计与制作教程 刘瑞信 pdf
  • 网站开发深天津设计公司排行榜
  • 做tcf法语听力题的网站公司网页简介
  • 十堰做网站最专业的公司深圳企业网查询
  • 购物网站大全排名调查drupal与wordpress哪个容易
  • 网站建设彳金手指排名网站开发完没人运营
  • 网站建设是设开发公司质量管理流程
  • 金沙网站怎么做代理wordpress tag=
  • 做网站必须花钱吗建筑人才网证书查询
  • 0基础网站建设模板工商注册官方网站
  • 河南网站设计公司价格网站在建设中是什么意思
  • 网站建设公司的成本有哪些方面四川省城乡建设网查询
  • 和什么人合作做游戏视频网站做推送网站
  • 做竞价网站访问突然变少施工企业负责人带班检查计划
  • 网站统计数据分析wordpress安装 第二步
  • 网站续费续的是什么钱Wordpress1002无标题
  • 公司入口网站appui设计师创意平台
  • 济南住房和城乡建设厅网站影视广告创意拍摄
  • 卢松松网站源码网站建设讲师招聘
  • wordpress建站网页无法运vs网站开发表格大小设置
  • 网站怎么制作教程科技小论文怎么写
  • 青岛外贸建设网站制作小程序制作页面教程