• Version
    • Scala 2.12
    • Scala 2.13
    • Snapshot Repository
      • Maven
      • Gradle

    influxdb-client-scala

    The reference Scala client that allows query and write for the InfluxDB 2.x by . The client is cross-built against Scala and 2.13.

    This section contains links to the client library documentation.

    The is based on the Akka Streams.

    The following example demonstrates querying using the Flux language:

    It is possible to parse a result line-by-line using the queryRaw method:

    1. package example
    2. import akka.actor.ActorSystem
    3. import akka.stream.scaladsl.Sink
    4. import com.influxdb.client.scala.InfluxDBClientScalaFactory
    5. import scala.concurrent.Await
    6. import scala.concurrent.duration.Duration
    7. object InfluxDB2ScalaExampleRaw {
    8. implicit val system: ActorSystem = ActorSystem("it-tests")
    9. def main(args: Array[String]): Unit = {
    10. val influxDBClient = InfluxDBClientScalaFactory
    11. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
    12. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
    13. + " |> range(start: -5m)"
    14. + " |> sample(n: 5, pos: 1)")
    15. //Result is returned as a stream
    16. val sink = influxDBClient.getQueryScalaApi().queryRaw(fluxQuery)
    17. //print results
    18. .runWith(Sink.foreach[String](it => println(s"Line: $it")))
    19. // wait to finish
    20. Await.result(sink, Duration.Inf)
    21. influxDBClient.close()
    22. system.terminate()
    23. }
    24. }

    A client can be configured via configuration file. The configuration file has to be named as influx2.properties and has to be in root of classpath.

    The influx2.readTimeout, influx2.writeTimeout and influx2.connectTimeout supports , s and m as unit. Default is milliseconds.

    Scala - 图8Configuration example
    1. influx2.url=http://localhost:8086
    2. influx2.org=my-org
    3. influx2.bucket=my-bucket
    4. influx2.token=my-token
    5. influx2.logLevel=BODY
    6. influx2.readTimeout=5s
    7. influx2.writeTimeout=10s
    8. influx2.connectTimeout=5s

    and then:

    1. val influxDBClient = InfluxDBClientScalaFactory.create();

    Client connection string

    A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.

    The following options are supported:

    Property namedefaultdescription
    org-default destination organization for writes and queries
    bucket-default destination bucket for writes
    token-the token to use for the authorization
    logLevelNONErest client verbosity level
    readTimeout10000 msread timeout
    writeTimeout10000 mswrite timeout
    connectTimeout10000 mssocket timeout

    The readTimeout, writeTimeout and connectTimeout supports ms, s and m as unit. Default is milliseconds.

    Scala - 图10Gzip support

    InfluxDBClientScala does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

    1. influxDBClient.enableGzip();
    1. influxDBClient.setLogLevel(LogLevel.HEADERS)

    Check the server status

    Server availability can be checked using the influxDBClient.ping() endpoint.

    Scala - 图13Construct queries using the query builder

    1. package example
    2. import java.time.temporal.ChronoUnit
    3. import akka.actor.ActorSystem
    4. import akka.stream.scaladsl.Sink
    5. import com.influxdb.client.scala.InfluxDBClientScalaFactory
    6. import com.influxdb.query.FluxRecord
    7. import com.influxdb.query.dsl.Flux
    8. import com.influxdb.query.dsl.functions.restriction.Restrictions
    9. import scala.concurrent.Await
    10. import scala.concurrent.duration.Duration
    11. object InfluxDB2ScalaExampleDSL {
    12. implicit val system: ActorSystem = ActorSystem("it-tests")
    13. val influxDBClient = InfluxDBClientScalaFactory
    14. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
    15. val mem = Flux.from("my-bucket")
    16. .range(-30L, ChronoUnit.MINUTES)
    17. .filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
    18. //Result is returned as a stream
    19. val results = influxDBClient.getQueryScalaApi().query(mem.toString())
    20. //Example of additional result stream processing on client side
    21. val sink = results
    22. //filter on client side using `filter` built-in operator
    23. .filter(it => it.getValue.asInstanceOf[Double] > 55)
    24. //take first 20 records
    25. .take(20)
    26. //print results
    27. .runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")))
    28. // wait to finish
    29. Await.result(sink, Duration.Inf)
    30. influxDBClient.close()
    31. system.terminate()
    32. }
    33. }

    The latest version for Maven dependency:

    Or when using with Gradle:

    1. dependencies {
    2. implementation "com.influxdb:influxdb-client-scala_2.12:5.0.0"
    3. }

    Scala 2.13

    The latest version for Maven dependency:

    1. <dependency>
    2. <groupId>com.influxdb</groupId>
    3. <artifactId>influxdb-client-scala_2.13</artifactId>
    4. <version>5.0.0</version>
    5. </dependency>

    Or when using with Gradle:

    1. dependencies {
    2. implementation "com.influxdb:influxdb-client-scala_2.13:5.0.0"
    3. }

    Snapshot Repository

    The snapshots are deployed into OSS Snapshot repository.

    Maven

    Scala - 图19Gradle

    1. repositories {
    2. maven { url "https://oss.sonatype.org/content/repositories/snapshots" }