网站js修改代码,网络营销公司主要做些什么,html插件代码大全,免费个人自助建站在大多数软件开发项目中#xff0c;有一点需要使应用程序开始与其他应用程序或第三方组件通信。 无论是发送电子邮件通知#xff0c;调用外部api#xff0c;写入文件还是将数据从一个地方迁移到另一个地方#xff0c;您都可以推出自己的解决方案或利用现有框架。 对于Jav… 在大多数软件开发项目中有一点需要使应用程序开始与其他应用程序或第三方组件通信。 无论是发送电子邮件通知调用外部api写入文件还是将数据从一个地方迁移到另一个地方您都可以推出自己的解决方案或利用现有框架。 对于Java生态系统中的现有框架我们可以发现Tibco BusinessWorks和Mule ESB 另一方面是Spring Integration和Apache Camel 。 在本教程中我将通过一个示例应用程序向您介绍Apache Camel 该示例应用程序从Twitter的示例提要中读取推文并使用Elastic Search实时索引这些推文。 什么是Apache Camel 将应用程序与生态系统中的内部或外部组件集成是软件开发中最复杂的任务之一如果操作不正确则可能导致混乱不堪并且长期维护很麻烦。 幸运的是Camel是Apache托管的开放源代码集成框架它基于企业集成模式 这些模式可以帮助编写更具可读性和可维护性的代码。 与Lego相似这些模式可以用作构建可靠软件设计的基础。 Apache Camel还支持各种各样的连接器以将您的应用程序与不同的框架和技术集成在一起。 顺便说一下它也可以与Spring很好地配合使用。 如果您不熟悉Spring那么您可能会发现这篇文章很有帮助 使用Spring Boot处理Twitter feed 。 在以下各节中我们将介绍一个示例应用程序其中Camel与Twitter示例提要和ElasticSearch集成在一起。 什么是ElasticSearch 类似于Apache Solr的 ElasticSearch是一个高度可扩展的开源基于Java的全文本搜索引擎构建在Apache Lucene之上。 在此示例应用程序中我们将使用ElasticSearch实时索引推文并在这些推文上提供全文本搜索功能。 其他使用的技术 除了Apache Camel和ElasticSearch我还在此应用程序中包括其他框架 Gradle作为构建工具 Spring Boot作为Web应用程序框架以及Twitter4j用于从Twitter示例提要中读取推文。 入门 该项目的框架是在http://start.spring.io生成的在那里我检查了Web依赖项选项填写了“项目元数据”部分然后选择“ Gradle Project”作为项目类型。 生成项目后您可以下载并将其导入您喜欢的IDE。 我现在不打算在Gradle上做更多的细节但是这是build.gradle文件中所有依赖项的列表 def camelVersion 2.15.2
dependencies {compile(org.springframework.boot:spring-boot-starter-web)compile(org.apache.camel:camel-core:${camelVersion})compile(org.apache.camel:camel-spring-boot:${camelVersion})compile(org.apache.camel:camel-twitter:${camelVersion})compile(org.apache.camel:camel-elasticsearch:${camelVersion})compile(org.apache.camel:camel-jackson:${camelVersion})compile(joda-time:joda-time:2.8.2)testCompile(org.springframework.boot:spring-boot-starter-test)
}使用骆驼路线进行整合 骆驼实现了面向消息的体系结构它的主要构建模块是描述消息流的路由 。 可以使用XML旧方式或其Java DSL新方式描述路由。 我们将在本文中仅讨论Java DSL因为这是首选且更优雅的选择。 好吧让我们看一个简单的Route from(file://orders).convertBodyTo(String.class).to(log:com.mycompany.order?levelDEBUG).to(jms:topic:OrdersTopic); 这里有几件事要注意 消息在由URI表示并使用URI配置的端点之间流动 路由只能有一个消息生产者端点在本例中为“ file// orders”它从orders文件夹中读取文件和多个消息消费者端点 “ logcom.mycompany.orderlevel DEBUG”它将文件的内容记录在com.mycompany.order日志记录类别下的调试消息中 在端点之间可以更改消息即convertBodyToString.class将消息正文转换为String。 另请注意相同的URI可以在一个路由中用于消费者端点而在另一路由中用于生产者端点 from(file://orders).convertBodyTo(String.class).to(direct:orders);from(direct:orders).to(log:com.mycompany.order?levelDEBUG).to(jms:topic:OrdersTopic); Direct端点是通用端点之一它允许将消息从一条路由同步传递到另一条路由。 这有助于创建可读代码并在代码的多个位置重用路由。 索引推文 现在让我们看一下代码中的一些路由。 让我们从简单的事情开始 private String ES_TWEET_INDEXER_ENDPOINT direct:tweet-indexer-ES;...from(twitter://streaming/sample?typeEVENTconsumerKey{{twitter4j.oauth.consumerKey}}consumerSecret{{twitter4j.oauth.consumerSecret}}∾cessToken{{twitter4j.oauth.accessToken}}∾cessTokenSecret{{twitter4j.oauth.accessTokenSecret}}).to(ES_TWEET_INDEXER_ENDPOINT); 这是如此简单对吧 到现在为止您可能已经知道该路由从Twitter示例提要中读取了推文并将它们传递到“ directtweet-indexer-ES”端点。 请注意consumerKeyconsumerSecret等已配置并作为系统属性传递请参见http://twitter4j.org/en/configuration.html 。 现在让我们看一下一个稍微复杂的Route该路由从“ directtweet-indexer-ES”端点读取并将Tweets批量插入到Elasticsearch中有关每个步骤的详细说明请参见注释 Value(${elasticsearch.tweet.uri})private String elasticsearchTweetUri;...from(ES_TWEET_INDEXER_ENDPOINT)// groups tweets into separate indexes on a weekly basis to make it easier clean up old tweets:.process(new WeeklyIndexNameHeaderUpdater(ES_TWEET_INDEX_TYPE))// converts Twitter4j Tweet object into an elasticsearch document represented by a Map:.process(new ElasticSearchTweetConverter())// collects tweets into weekly batches based on index name:.aggregate(header(indexName), new ListAggregationStrategy())// creates new batches every 2 seconds.completionInterval(2000)// makes sure the last batch will be processed before the application shuts down:.forceCompletionOnStop()// inserts a batch of tweets to elasticsearch: .to(elasticsearchTweetUri).log(Uploaded documents to ElasticSearch index ${headers.indexName}: ${body.size()}); 关于此路线的注意事项 elasticsearchTweetUri是一个字段其值由Spring从application.properties文件elasticsearch.tweet.uri elasticsearch// tweet-indexeroperation BULK_INDEXip 127.0.0.1port 9300中获取并注入到该字段中 为了在Route中实现自定义处理逻辑我们可以创建实现Processor接口的类。 参见WeeklyIndexNameHeaderUpdater和ElasticSearchTweetConverter 使用自定义ListAggregationStrategy策略聚合推文该策略将消息聚合到ArrayList中稍后每2秒或在应用程序停止时传递给下一个终结点 Camel实现了一种表达语言 我们正在使用它来记录批处理的大小“ $ {body.size}”和插入消息的索引的名称$ {headers.indexName}。 在Elasticsearch中搜索推文 现在我们已经在Elasticsearch中索引了推文是时候对其进行一些搜索了。 首先让我们看一下接收搜索查询的Route和限制搜索结果数量的maxSize参数 public static final String TWEET_SEARCH_URI vm:tweetSearch;...from(TWEET_SEARCH_URI).setHeader(CamelFileName, simple(tweet-${body}-${header.maxSize}-${date:now:yyyyMMddHHmmss}.txt))// calls the search() method of the esTweetService which returns an iterator// to process search result - better than keeping the whole resultset in memory:.split(method(esTweetService, search))// converts Elasticsearch doucment to Map object:.process(new ElasticSearchSearchHitConverter())// serializes the Map object to JSON:.marshal(new JacksonDataFormat())// appends new line at the end of every tweet.setBody(simple(${body}\n))// write search results as json into a file under /tmp folder:.to(file:/tmp?fileExistAppend).end().log(Wrote search results to /tmp/${headers.CamelFileName}); 当消息传递到“ vmtweetSearch”端点该端点使用内存队列异步处理消息时将触发此路由。 SearchController类实现了REST api允许用户通过使用Camel的ProducerTemplate类将消息发送到“ vmtweetSearch”端点来运行tweet搜索 Autowiredprivate ProducerTemplate producerTemplate;RequestMapping(value /tweet/search, method { RequestMethod.GET, RequestMethod.POST },produces MediaType.TEXT_PLAIN_VALUE)ResponseBodypublic String tweetSearch(RequestParam(q) String query,RequestParam(value max) int maxSize) {LOG.info(Tweet search request received with query: {} and max: {}, query, maxSize);MapString, Object headers new HashMapString, Object();// content is the field in the Elasticsearch index that well be querying:headers.put(queryField, content);headers.put(maxSize, maxSize);producerTemplate.asyncRequestBodyAndHeaders(CamelRouter.TWEET_SEARCH_URI, query, headers);return Request is queued;} 这将触发Elasticsearch的执行但是结果不会在响应中返回而是写入/ tmp文件夹中的文件如前所述。 此路由使用ElasticSearchService类在ElasticSearch中搜索推文。 当执行此Route时Camel调用search方法并传递搜索查询和maxSize作为输入参数 public SearchHitIterator search(Body String query, Header(value queryField) String queryField, Header(value maxSize) int maxSize) {boolean scroll maxSize batchSize;LOG.info(Executing {} on index type: {} with query: {} and max: {}, scroll ? scan scroll : search, indexType, query, maxSize);QueryBuilder qb termQuery(queryField, query);long startTime System.currentTimeMillis();SearchResponse response scroll ? prepareSearchForScroll(maxSize, qb) : prepareSearchForRegular(maxSize, qb);return new SearchHitIterator(client, response, scroll, maxSize, KEEP_ALIVE_MILLIS, startTime);} 请注意根据maxSize和batchSize代码将执行常规搜索以返回单页结果或者执行滚动请求以使我们能够检索大量结果。 在滚动的情况下 SearchHitIterator将随后对Elasticsearch进行调用以分批检索结果。 安装ElasticSearch 从https://www.elastic.co/downloads/elasticsearch下载Elasticsearch。 将其安装到本地文件夹$ ES_HOME 编辑$ ES_HOME / config / elasticsearch.yml并添加以下行 cluster.nametweet-indexer 安装BigDesk插件以监视Elasticsearch$ ES_HOME / bin / plugin -install lukas-vlcek / bigdesk 运行Elasticsearch$ ES_HOME / bin / elasticsearch.sh或$ ES_HOME / bin / elasticsearch.bat 这些步骤将允许您以最少的配置运行独立的Elasticsearch实例但请记住它们并非供生产使用。 运行 这是应用程序的入口点可以从命令行运行。 package com.kaviddiss.twittercamel;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
} 要运行该应用程序请从您最喜欢的IDE运行Application.main方法或者从命令行执行以下代码 $GRADLE_HOME/bin/gradlew build java -jar build/libs/twitter-camel-ingester-0.0.1-SNAPSHOT.jar 一旦应用程序启动它将自动开始索引推文。 转到http// localhost9200 / _plugin / bigdesk /cluster可视化索引 要搜索推文请在浏览器中输入与此类似的URL http// localhost8080 / tweet / searchq torontomax 100 。 使用BigDesk插件我们可以监视Elasticsearch如何索引推文 结论 在Apache Camel的简介中我们介绍了如何使用此集成框架与Twitter提要feed和Elasticsearch之类的外部组件进行通信以实时索引和搜索推文。 示例应用程序的源代码可从https://github.com/davidkiss/twitter-camel-ingester获得 。 翻译自: https://www.javacodegeeks.com/2015/09/learn-apache-camel-indexing-tweets-in-real-time.html