Migrate data from InfluxDB Cloud to InfluxDB OSS

The following guide provides instructions for setting up an InfluxDB OSS task that queries data from an InfluxDB Cloud bucket in time-based batches and writes each batch to an InfluxDB OSS bucket.

All queries against data in InfluxDB Cloud are subject to your organization’s .

  1. .

  2. In InfluxDB Cloud, create an API token with read access to the bucket you want to migrate.

  3. In InfluxDB OSS:

    1. Add your InfluxDB Cloud API token as a secret using the key, . See for more information.

    2. Create a bucket to migrate data to.

    3. to store temporary migration metadata.

    4. Create a new task using the provided . Update the necessary migration configuration options.

    5. (Optional) Set up .

    6. Save the task.

      Newly-created tasks are enabled by default, so the data migration begins when you save the task.

  1. Specify how often you want the task to run using the task.every option. See Determine your task interval.

  2. Define the following properties in the migration :

    migration
    • start: Earliest time to include in the migration. See .
    • stop: Latest time to include in the migration.
    • batchInterval: Duration of each time-based batch. See Determine your batch interval.
    • batchBucket: InfluxDB OSS bucket to store migration batch metadata in.
    • sourceHost: to migrate data from.
    • sourceOrg: InfluxDB Cloud organization to migrate data from.
    • sourceToken: InfluxDB Cloud API token. To keep the API token secure, store it as a secret in InfluxDB OSS.
    • sourceBucket: InfluxDB Cloud bucket to migrate data from.
    • destinationBucket: InfluxDB OSS bucket to migrate data to.

Migration Flux script

  1. import "array"
  2. import "experimental"
  3. import "influxdata/influxdb/secrets"
  4. // Configure the task
  5. option task = {every: 5m, name: "Migrate data from InfluxDB Cloud"}
  6. // Configure the migration
  7. migration = {
  8. start: 2022-01-01T00:00:00Z,
  9. stop: 2022-02-01T00:00:00Z,
  10. batchInterval: 1h,
  11. batchBucket: "migration",
  12. sourceHost: "https://cloud2.influxdata.com",
  13. sourceOrg: "example-cloud-org",
  14. sourceToken: secrets.get(key: "INFLUXDB_CLOUD_TOKEN"),
  15. sourceBucket: "example-cloud-bucket",
  16. }
  17. // batchRange dynamically returns a record with start and stop properties for
  18. // the current batch. It queries migration metadata stored in the
  19. // `migration.batchBucket` to determine the stop time of the previous batch.
  20. // It uses the previous stop time as the new start time for the current batch
  21. // and adds the `migration.batchInterval` to determine the current batch stop time.
  22. batchRange = () => {
  23. _lastBatchStop =
  24. (from(bucket: migration.batchBucket)
  25. |> range(start: migration.start)
  26. |> filter(fn: (r) => r._field == "batch_stop")
  27. |> filter(fn: (r) => r.srcOrg == migration.sourceOrg)
  28. |> filter(fn: (r) => r.srcBucket == migration.sourceBucket)
  29. |> last()
  30. |> findRecord(fn: (key) => true, idx: 0))._value
  31. _batchStart =
  32. if exists _lastBatchStop then
  33. time(v: _lastBatchStop)
  34. else
  35. migration.start
  36. return {start: _batchStart, stop: experimental.addDuration(d: migration.batchInterval, to: _batchStart)}
  37. }
  38. // Define a static record with batch start and stop time properties
  39. // Check to see if the current batch start time is beyond the migration.stop
  40. // time and exit with an error if it is.
  41. finished =
  42. if batch.start >= migration.stop then
  43. die(msg: "Batch range is beyond the migration range. Migration is complete.")
  44. else
  45. "Migration in progress"
  46. // Query all data from the specified source bucket within the batch-defined time
  47. // range. To limit migrated data by measurement, tag, or field, add a `filter()`
  48. // function after `range()` with the appropriate predicate fn.
  49. data = () =>
  50. from(host: migration.sourceHost, org: migration.sourceOrg, token: migration.sourceToken, bucket: migration.sourceBucket)
  51. |> range(start: batch.start, stop: batch.stop)
  52. // rowCount is a stream of tables that contains the number of rows returned in
  53. // the batch and is used to generate batch metadata.
  54. rowCount =
  55. data()
  56. |> group(columns: ["_start", "_stop"])
  57. |> count()
  58. // emptyRange is a stream of tables that acts as filler data if the batch is
  59. // empty. This is used to generate batch metadata for empty batches and is
  60. // necessary to correctly increment the time range for the next batch.
  61. emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])
  62. // metadata returns a stream of tables representing batch metadata.
  63. _input =
  64. if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
  65. rowCount
  66. else
  67. emptyRange
  68. return
  69. _input
  70. |> map(
  71. fn: (r) =>
  72. ({
  73. _time: now(),
  74. _measurement: "batches",
  75. srcOrg: migration.sourceOrg,
  76. srcBucket: migration.sourceBucket,
  77. dstBucket: migration.destinationBucket,
  78. batch_stop: string(v: batch.stop),
  79. rows: r._value,
  80. percent_complete:
  81. float(v: int(v: r._stop) - int(v: migration.start)) / float(
  82. v: int(v: migration.stop) - int(v: migration.start),
  83. ) * 100.0,
  84. }),
  85. )
  86. |> group(columns: ["_measurement", "srcOrg", "srcBucket", "dstBucket"])
  87. }
  88. // Write the queried data to the specified InfluxDB OSS bucket.
  89. data()
  90. |> to(bucket: migration.destinationBucket)
  91. // Generate and store batch metadata in the migration.batchBucket.
  92. metadata()
  93. |> experimental.to(bucket: migration.batchBucket)

Determine your task interval

The task interval determines how often the migration task runs and is defined by the . InfluxDB Cloud rate limits and quotas reset every five minutes, so we recommend a 5m task interval.

You can do shorter task intervals and execute the migration task more often, but you need to balance the task interval with your batch interval and the amount of data returned in each batch. If the total amount of data queried in each five-minute interval exceeds your InfluxDB Cloud organization’s , the batch will fail until rate limits and quotas reset.

Determine your migration start time

The migration.start time should be at or near the same time as the earliest data point you want to migrate. All migration batches are determined using the migration.start time and migration.batchInterval settings.

To find time of the earliest point in your bucket, run the following query:

Determine your batch interval

The migration.batchInterval setting controls the time range queried by each batch. The “density” of the data in your InfluxDB Cloud bucket and your InfluxDB Cloud organization’s rate limits and quotas determine what your batch interval should be.

For example, if you’re migrating data collected from hundreds of sensors with points recorded every second, your batch interval will need to be shorter. If you’re migrating data collected from five sensors with points recorded every minute, your batch interval can be longer. It all depends on how much data gets returned in a single batch.

If points occur at regular intervals, you can get a fairly accurate estimate of how much data will be returned in a given time range by using the /api/v2/query endpoint to execute a query for the time range duration and then measuring the size of the response body.

  1. INFLUXDB_CLOUD_ORG=<your_influxdb_cloud_org>
  2. INFLUXDB_CLOUD_TOKEN=<your_influxdb_cloud_token>
  3. INFLUXDB_CLOUD_BUCKET=<your_influxdb_cloud_bucket>
  4. curl -so /dev/null --request POST \
  5. https://cloud2.influxdata.com/api/v2/query?org=$INFLUXDB_CLOUD_ORG \
  6. --header "Authorization: Token $INFLUXDB_CLOUD_TOKEN" \
  7. --header "Accept: application/csv" \
  8. --header "Content-type: application/vnd.flux" \
  9. --data "from(bucket:\"$INFLUXDB_CLOUD_BUCKET\") |> range(start: -1d, stop: now())" \
  10. --write-out '%{size_download}'

You can also use other HTTP API tools like that provide the size of the response body.

Divide the output of this command by 1000000 to convert it to megabytes (MB).

For example, if the response body of your query that returns data from one day is 8 MB and you’re using the InfluxDB Cloud Free Plan with a read limit of 300 MB per five minutes:

  1. batchInterval = (300 / 8) * 1d
  2. // batchInterval = 37d

You could query 37 days of data before hitting your read limit, but this is just an estimate. We recommend setting the batchInterval slightly lower than the calculated interval to allow for variation between batches. So in this example, it would be best to set your batchInterval to 35d.

Important things to note
  • This assumes no other queries are running in your InfluxDB Cloud organization.
  • You should also consider your network speeds and whether a batch can be fully downloaded within the .

The InfluxDB Cloud Migration Community template installs the migration task outlined in this guide as well as a dashboard for monitoring running data migrations.

If the migration task fails, view your task logs to identify the specific error. Below are common causes of migration task failures.

Exceeded rate limits

If your data migration causes you to exceed your InfluxDB Cloud organization’s limits and quotas, the task will return an error similar to:

Possible solutions:

  • Update the migration.batchInterval setting in your migration task to use a smaller interval. Each batch will then query less data.

If the API token you add as the INFLUXDB_CLOUD_SECRET doesn’t have read access to your InfluxDB Cloud bucket, the task will return an error similar to:

  1. unauthorized access

Possible solutions:

  • Ensure the API token has read access to your InfluxDB Cloud bucket.

Query timeout

Possible solutions:

  • Update the migration.batchInterval setting in your migration task to use a smaller interval. Each batch will then query less data and take less time to return results.