influxdb-client-scala
The reference Scala client that allows query and write for the InfluxDB 2.x by . The client is cross-built against Scala and 2.13
.
This section contains links to the client library documentation.
The is based on the Akka Streams.
The following example demonstrates querying using the Flux language:
It is possible to parse a result line-by-line using the queryRaw
method:
package example
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import com.influxdb.client.scala.InfluxDBClientScalaFactory
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object InfluxDB2ScalaExampleRaw {
implicit val system: ActorSystem = ActorSystem("it-tests")
def main(args: Array[String]): Unit = {
val influxDBClient = InfluxDBClientScalaFactory
.create("http://localhost:8086", "my-token".toCharArray, "my-org")
val fluxQuery = ("from(bucket: \"my-bucket\")\n"
+ " |> range(start: -5m)"
+ " |> sample(n: 5, pos: 1)")
//Result is returned as a stream
val sink = influxDBClient.getQueryScalaApi().queryRaw(fluxQuery)
//print results
.runWith(Sink.foreach[String](it => println(s"Line: $it")))
// wait to finish
Await.result(sink, Duration.Inf)
influxDBClient.close()
system.terminate()
}
}
A client can be configured via configuration file. The configuration file has to be named as influx2.properties
and has to be in root of classpath.
The influx2.readTimeout
, influx2.writeTimeout
and influx2.connectTimeout
supports , s
and m
as unit. Default is milliseconds.
Configuration example
influx2.url=http://localhost:8086
influx2.org=my-org
influx2.bucket=my-bucket
influx2.token=my-token
influx2.logLevel=BODY
influx2.readTimeout=5s
influx2.writeTimeout=10s
influx2.connectTimeout=5s
and then:
val influxDBClient = InfluxDBClientScalaFactory.create();
Client connection string
A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.
The following options are supported:
Property name | default | description |
---|---|---|
org | - | default destination organization for writes and queries |
bucket | - | default destination bucket for writes |
token | - | the token to use for the authorization |
logLevel | NONE | rest client verbosity level |
readTimeout | 10000 ms | read timeout |
writeTimeout | 10000 ms | write timeout |
connectTimeout | 10000 ms | socket timeout |
The readTimeout
, writeTimeout
and connectTimeout
supports ms
, s
and m
as unit. Default is milliseconds.
Gzip support
InfluxDBClientScala
does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:
influxDBClient.enableGzip();
influxDBClient.setLogLevel(LogLevel.HEADERS)
Check the server status
Server availability can be checked using the influxDBClient.ping()
endpoint.
Construct queries using the query builder
package example
import java.time.temporal.ChronoUnit
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import com.influxdb.client.scala.InfluxDBClientScalaFactory
import com.influxdb.query.FluxRecord
import com.influxdb.query.dsl.Flux
import com.influxdb.query.dsl.functions.restriction.Restrictions
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object InfluxDB2ScalaExampleDSL {
implicit val system: ActorSystem = ActorSystem("it-tests")
val influxDBClient = InfluxDBClientScalaFactory
.create("http://localhost:8086", "my-token".toCharArray, "my-org")
val mem = Flux.from("my-bucket")
.range(-30L, ChronoUnit.MINUTES)
.filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
//Result is returned as a stream
val results = influxDBClient.getQueryScalaApi().query(mem.toString())
//Example of additional result stream processing on client side
val sink = results
//filter on client side using `filter` built-in operator
.filter(it => it.getValue.asInstanceOf[Double] > 55)
//take first 20 records
.take(20)
//print results
.runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")))
// wait to finish
Await.result(sink, Duration.Inf)
influxDBClient.close()
system.terminate()
}
}
The latest version for Maven dependency:
Or when using with Gradle:
dependencies {
implementation "com.influxdb:influxdb-client-scala_2.12:5.0.0"
}
Scala 2.13
The latest version for Maven dependency:
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-scala_2.13</artifactId>
<version>5.0.0</version>
</dependency>
Or when using with Gradle:
dependencies {
implementation "com.influxdb:influxdb-client-scala_2.13:5.0.0"
}
Snapshot Repository
The snapshots are deployed into OSS Snapshot repository.
Maven
Gradle
repositories {
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }