关于新农村网络建设网站,视频制作表情包,如何进行专业建设,网站集约化建设的优点从我担任软件工程师的第一天起#xff0c;我总是听到很多方面的相同要求#xff1a; “ 我们希望所有内容都可配置#xff0c;我们希望在运行时更改所有内容#xff0c;我们希望有一个可视化工具来应用所有这些逻辑#xff0c;以便非开发人员使用和配置我们的应用程序。 … 从我担任软件工程师的第一天起我总是听到很多方面的相同要求 “ 我们希望所有内容都可配置我们希望在运行时更改所有内容我们希望有一个可视化工具来应用所有这些逻辑以便非开发人员使用和配置我们的应用程序。 ” 我也喜欢这种通用范围但是众所周知软件系统的适应性不强客户的需求也不稳定。 在过去的几年中我们已经使用传统的框架/技术JMX分布式缓存Spring或JEE等构建了此类可配置应用程序并非100可配置。 近年来我们的体系结构中还必须包含一个附加概念这就是大数据 或3V或4V或任何更合适的词的概念。 这个新概念淘汰了我们熟悉并在旧的3层应用程序中应用的各种解决方案或变通方法。 有趣的是我很多次都和十年前一样。 这是软件开发的规则它永远不会结束因此个人才能和新冒险也永远不会结束:-) 主要问题仍然是相同的即如何构建可配置的ETL分布式应用程序 。 因此我建立了一个小型的适应性强的解决方案该解决方案在许多用例中可能会有所帮助。 我在大数据世界中使用了3种常用工具 Java Apache Storm和Kite SDK Morplines 。 Java是主要的编程语言 Apache Storm是分布式流处理引擎而Kite SDK Morphlines是可配置的ETL引擎。 风筝SDK Morplines 从其描述复制而来 Morphlines是一个开源框架它减少了构建和更改Hadoop ETL流处理应用程序所需的时间和精力该应用程序可将数据提取转换并加载到Apache SolrHBaseHDFSEnterprise Data Warehouse或Analytic Online Dashboards中。 morphline是一个丰富的配置文件可以轻松定义一个转换链该转换链可以使用来自任何类型数据源的任何类型的数据处理数据并将结果加载到Hadoop组件中。 它用简单的配置步骤代替了Java编程并相应地减少了与开发和维护定制ETL项目相关的成本和集成工作。 除了内置命令外 您还可以轻松实现自己的命令 并在吗啉配置文件中使用它。 示例Morphline配置读取一个JSON字符串解析它然后只记录一个特定的JSON元素 morphlines : [{id : json_terminal_logimportCommands : [org.kitesdk.**]commands : [# read the JSON blob{ readJson: {} }# extract JSON objects into head fields{ extractJsonPaths {flatten: truepaths: {name: /nameage: /age}} }# log data{ logInfo {format : name: {}, record: {}args : [{name}, {}]}}]
}]风暴变身螺栓 为了在Storm中使用Morphlines我实现了一个自定义MorphlinesBolt 。 该螺栓的主要职责是 通过配置文件初始化Morphlines处理程序 初始化映射说明 a从元组到吗啉输入以及 b从Morphline输出到新的输出元组 使用已初始化的Morplines上下文处理每个传入事件 如果Bolt不是Terminal 则使用提供的Mapper 类型“ b”使用Morphline执行的输出发出一个新的Tuple。 简单的可配置ETL拓扑 为了测试自定义MorphlinesBolt 我编写了2个简单的测试。 在这些测试中您可以看到MorphlinesBolt是如何初始化的然后是每次执行的结果。 作为输入我使用了一个自定义的SpoutRandomJsonTestSpout它仅每100毫秒发出一次新的JSON字符串可配置。 DummyJsonTerminalLogTopology 一个简单的拓扑 该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上 MorphlinesBolt被配置为终端螺栓这意味着对于每个输入Tuple不会发出新的Tuple。 public class DummyJsonTerminalLogTopology {public static void main(String[] args) throws Exception {Config config new Config();RandomJsonTestSpout spout new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId(json_terminal_log).withMorphlineConfFile(target/test-classes/morphline_confs/json_terminal_log.conf);TopologyBuilder builder new TopologyBuilder();builder.setSpout(WORD_SPOUT, spout, 1);builder.setBolt(MORPH_BOLT, morphBolt, 1).shuffleGrouping(WORD_SPOUT);if (args.length 0) {LocalCluster cluster new LocalCluster();cluster.submitTopology(MyDummyJsonTerminalLogTopology, config, builder.createTopology());Thread.sleep(10000);cluster.killTopology(MyDummyJsonTerminalLogTopology);cluster.shutdown();System.exit(0);} else if (args.length 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println(Usage: DummyJsonTerminalLogTopology topology_name);}}
}DummyJson2StringTopology 一个简单的拓扑 该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上 MorphlinesBolt被配置为普通螺栓这意味着对于每个输入Tuple它都会发出一个新的Tuple。 public class DummyJson2StringTopology {public static void main(String[] args) throws Exception {Config config new Config();RandomJsonTestSpout spout new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId(json2string).withMorphlineConfFile(target/test-classes/morphline_confs/json2string.conf)//.withOutputProcessors(Arrays.asList(resultRecordHandlers));.withOutputFields(CmnStormCons.TUPLE_FIELD_MSG).withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));LoggingBolt printBolt new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);TopologyBuilder builder new TopologyBuilder();builder.setSpout(WORD_SPOUT, spout, 1);builder.setBolt(MORPH_BOLT, morphBolt, 1).shuffleGrouping(WORD_SPOUT);builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(MORPH_BOLT);if (args.length 0) {LocalCluster cluster new LocalCluster();cluster.submitTopology(MyDummyJson2StringTopology, config, builder.createTopology());Thread.sleep(10000);cluster.killTopology(MyDummyJson2StringTopology);cluster.shutdown();System.exit(0);} else if (args.length 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println(Usage: DummyJson2StringTopology topology_name);}}
}最后的想法 MorphlinesBolt可以用作任何可配置ETL“解决方案”的一部分作为单处理Bolt作为终端Bolt作为复杂管道的一部分等等。 在github中的示例项目集中源代码作为Maven模块 sv-etl-storm-morphlines 提供。 最好的组合是将MorphlinesBolt与Flux一起使用。 这可能会为您提供完全可配置的ETL拓扑 我还没有添加为选项以便保持较少的依赖关系我可以添加范围“ test”。 该模块不是最终模块我将尝试对其进行改进因此许多人会在第一个实现中发现各种错误。 对于任何其他想法或说明请写评论 这是我2016年的第一篇文章 希望您身体健康思想和行动更好。 一切的第一项美德/价值是人类以及对我们所生活的环境社会地球动物植物等的尊重。 所有其他都是次要优先事项不应破坏优先事项所隐含的内容。 始终牢记最重要的美德并在您采取的任何行动或思想中考虑它们。 翻译自: https://www.javacodegeeks.com/2016/01/configurable-etl-processing-using-apache-storm-kite-sdk-morphlines.html