制作外贸网站模板,漯河英文网站建设,网站设计分工,推荐做网站的话术我很高兴宣布Pygmalios开发的ReactiveInflux的第一个发行版。 InfluxDB错过了Scala和Java的非阻塞驱动程序。 不变性#xff0c;可测试性和可扩展性是ReactiveInflux的关键功能。 加上对Apache Spark的支持#xff0c;它是首选武器。 https://github.com/pygmalios/reactive… 我很高兴宣布Pygmalios开发的ReactiveInflux的第一个发行版。 InfluxDB错过了Scala和Java的非阻塞驱动程序。 不变性可测试性和可扩展性是ReactiveInflux的关键功能。 加上对Apache Spark的支持它是首选武器。 https://github.com/pygmalios/reactiveinflux 它在内部使用Play Framework WS API 它是基于Async Http Client构建的丰富的异步HTTP客户端 。 特征 Scala的异步非阻塞接口 Scala和Java的同步阻塞接口 同时支持Spark和Spark流 不变性 可测性 可扩展性 兼容性 InfluxDB 0.11、0.10和0.9甚至可能更旧 Scala 2.11和2.10 Java 7及以上 Apache Spark 1.4及更高版本 Scala异步非阻塞示例 val result withInfluxDb(new URI(http://localhost:8086/), example1) { db db.create().flatMap { _ val point Point(time DateTime.now(),measurement measurement1,tags Map(t1 - A, t2 - B),fields Map(f1 - 10.3,f2 - x,f3 - -1,f4 - true))db.write(point).flatMap { _ db.query(SELECT * FROM measurement1).flatMap { queryResult println(queryResult.row.mkString)db.drop()}}}
}Scala同步阻塞示例 implicit val awaitAtMost 10.seconds
syncInfluxDb(new URI(http://localhost:8086/), example1) { db db.create()val point Point(time DateTime.now(),measurement measurement1,tags Map(t1 - A, t2 - B),fields Map(f1 - 10.3,f2 - x,f3 - -1,f4 - true))db.write(point)val queryResult db.query(SELECT * FROM measurement1)println(queryResult.row.mkString)db.drop()
}Java同步阻塞示例 // Use Influx at the provided URL
ReactiveInfluxConfig config new JavaReactiveInfluxConfig(new URI(http://localhost:8086/));
long awaitAtMostMillis 30000;
try (SyncReactiveInflux reactiveInflux new JavaSyncReactiveInflux(config, awaitAtMostMillis)) {SyncReactiveInfluxDb db reactiveInflux.database(example1);db.create();Map tags new HashMap();tags.put(t1, A);tags.put(t2, B);Map fields new HashMap();fields.put(f1, 10.3);fields.put(f2, x);fields.put(f3, -1);fields.put(f4, true);Point point new JavaPoint(DateTime.now(),measurement1,tags,fields);db.write(point);QueryResult queryResult db.query(SELECT * FROM measurement1);System.out.println(queryResult.getRow().mkString());db.drop();
}Apache Spark Scala示例 val point1 Point(time DateTime.now(),measurement measurement1,tags Map(tagKey1 - tagValue1,tagKey2 - tagValue2),fields Map(fieldKey1 - fieldValue1,fieldKey2 - 10.7)
)
sc.parallelize(Seq(point1)).saveToInflux()Apache Spark流Scala示例 val point1 Point(time DateTime.now(),measurement measurement1,tags Map(tagKey1 - tagValue1,tagKey2 - tagValue2),fields Map(fieldKey1 - fieldValue1,fieldKey2 - 10.7)
)
val queue new mutable.Queue[RDD[Point]]
queue.enqueue(ssc.sparkContext.parallelize(Seq(point1)))
ssc.queueStream(queue).saveToInflux()Apache Spark Java示例 ...
SparkInflux sparkInflux new SparkInflux(example, 1000);
sparkInflux.saveToInflux(sc.parallelize(Collections.singletonList(point)));Apache Spark流Java示例 ...
SparkInflux sparkInflux new SparkInflux(example, 1000);
Queue queue new LinkedList();
queue.add(ssc.sparkContext().parallelize(Collections.singletonList(point)));
sparkInflux.saveToInflux(ssc.queueStream(queue)); 斯洛伐克布拉迪斯拉发的高科技初创公司投资于尖端技术以确保实时预测零售分析领域的快速增长。 翻译自: https://www.javacodegeeks.com/2016/04/introducing-reactiveinflux-non-blocking-influxdb-driver-scala-java-supporting-apache-spark.html