Scala REPL

    To use the shell with an integrated Flink cluster just execute:

    in the root directory of your binary Flink directory. To run the Shell on acluster, please see the Setup section below.

    The shell supports DataSet, DataStream, Table API and SQL. Four different Environments are automatically prebound after startup. Use “benv” and “senv” to access the Batch and Streaming ExecutionEnvironment respectively. Use “btenv” and “stenv” to access BatchTableEnvironment and StreamTableEnvironment respectively.

    The following example will execute the wordcount program in the Scala shell:

    1. "To be, or not to be,--that is the question:--",
    2. "Whether 'tis nobler in the mind to suffer",
    3. "The slings and arrows of outrageous fortune",
    4. "Or to take arms against a sea of troubles,")
    5. Scala-Flink> val counts = text
    6. .flatMap { _.toLowerCase.split("\\W+") }
    7. .map { (_, 1) }.groupBy(0).sum(1)
    8. Scala-Flink> counts.print()

    The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.

    It is possible to write results to a file. However, in this case you need to call execute, to run your program:

    1. Scala-Flink> benv.execute("MyProgram")

    DataStream API

    1. Scala-Flink> val textStreaming = senv.fromElements(
    2. "To be, or not to be,--that is the question:--",
    3. "Whether 'tis nobler in the mind to suffer",
    4. "The slings and arrows of outrageous fortune",
    5. "Or to take arms against a sea of troubles,")
    6. Scala-Flink> val countsStreaming = textStreaming
    7. .flatMap { _.toLowerCase.split("\\W+") }
    8. .map { (_, 1) }.keyBy(0).sum(1)
    9. Scala-Flink> countsStreaming.print()
    10. Scala-Flink> senv.execute("Streaming Wordcount")

    Note, that in the Streaming case, the print operation does not trigger execution directly.

    The Flink Shell comes with command history and auto-completion.

    The example below is a wordcount program using Table API:

    1. Scala-Flink> import org.apache.flink.table.functions.TableFunction
    2. Scala-Flink> val textSource = stenv.fromDataStream(
    3. senv.fromElements(
    4. "To be, or not to be,--that is the question:--",
    5. "Whether 'tis nobler in the mind to suffer",
    6. "The slings and arrows of outrageous fortune",
    7. "Or to take arms against a sea of troubles,"),
    8. 'text)
    9. Scala-Flink> class $Split extends TableFunction[String] {
    10. def eval(s: String): Unit = {
    11. s.toLowerCase.split("\\W+").foreach(collect)
    12. }
    13. Scala-Flink> val split = new $Split
    14. Scala-Flink> textSource.join(split('text) as 'word).
    15. groupBy('word).select('word, 'word.count as 'count).
    16. toRetractStream[(String, Long)].print
    17. Scala-Flink> senv.execute("Table Wordcount")

    Note, that using $ as a prefix for the class name of TableFunction is a workaround of the issue that scala incorrectly generated inner class name.

    SQL

    The following example is a wordcount program written in SQL:

    1. Scala-Flink> import org.apache.flink.table.functions.TableFunction
    2. Scala-Flink> val textSource = stenv.fromDataStream(
    3. senv.fromElements(
    4. "Whether 'tis nobler in the mind to suffer",
    5. "The slings and arrows of outrageous fortune",
    6. "Or to take arms against a sea of troubles,"),
    7. 'text)
    8. Scala-Flink> stenv.registerTable("text_source", textSource)
    9. Scala-Flink> class $Split extends TableFunction[String] {
    10. def eval(s: String): Unit = {
    11. s.toLowerCase.split("\\W+").foreach(collect)
    12. }
    13. }
    14. Scala-Flink> stenv.registerFunction("split", new $Split)
    15. Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS `count`
    16. FROM text_source
    17. JOIN LATERAL table(split(text)) AS T(word)
    18. ON TRUE
    19. GROUP BY T.word""")
    20. Scala-Flink> result.toRetractStream[(String, Long)].print
    21. Scala-Flink> senv.execute("SQL Wordcount")
    1. Scala-Flink> import org.apache.flink.table.functions.TableFunction
    2. Scala-Flink> val textSource = btenv.fromDataSet(
    3. benv.fromElements(
    4. "To be, or not to be,--that is the question:--",
    5. "Whether 'tis nobler in the mind to suffer",
    6. "The slings and arrows of outrageous fortune",
    7. "Or to take arms against a sea of troubles,"),
    8. 'text)
    9. Scala-Flink> btenv.registerTable("text_source", textSource)
    10. Scala-Flink> class $Split extends TableFunction[String] {
    11. def eval(s: String): Unit = {
    12. s.toLowerCase.split("\\W+").foreach(collect)
    13. }
    14. }
    15. Scala-Flink> btenv.registerFunction("split", new $Split)
    16. Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS `count`
    17. FROM text_source
    18. ON TRUE
    19. GROUP BY T.word""")
    20. Scala-Flink> result.toDataSet[(String, Long)].print

    It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.

    To get an overview of what options the Scala Shell provides, please use

    1. bin/start-scala-shell.sh --help

    To use the shell with an integrated Flink cluster just execute:

    Remote

    To use it with a running cluster start the scala shell with the keyword remoteand supply the host and port of the JobManager with:

    1. bin/start-scala-shell.sh remote <hostname> <portnumber>

    The shell can deploy a Flink cluster to YARN, which is used exclusively by theshell. The number of YARN containers can be controlled by the parameter -n <arg>.The shell deploys a new Flink cluster on YARN and connects thecluster. You can also specify options for YARN cluster such as memory forJobManager, name of YARN application, etc.

    For example, to start a Yarn cluster for the Scala Shell with two TaskManagersuse the following:

    1. bin/start-scala-shell.sh yarn -n 2

    For all other options, see the full reference at the bottom.

    Yarn Session

    1. bin/start-scala-shell.sh yarn
    1. Flink Scala Shell
    2. Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
    3. Command: local [options]
    4. Starts Flink scala shell with a local Flink cluster
    5. -a <path/to/jar> | --addclasspath <path/to/jar>
    6. Specifies additional jars to be used in Flink
    7. Command: remote [options] <host> <port>
    8. Starts Flink scala shell connecting to a remote cluster
    9. <host>
    10. Remote host name as string
    11. <port>
    12. Remote port as integer
    13. -a <path/to/jar> | --addclasspath <path/to/jar>
    14. Specifies additional jars to be used in Flink
    15. Command: yarn [options]
    16. Starts Flink scala shell connecting to a yarn cluster
    17. -n arg | --container arg
    18. Number of YARN container to allocate (= Number of TaskManagers)
    19. -jm arg | --jobManagerMemory arg
    20. Memory for JobManager container with optional unit (default: MB)
    21. -nm <value> | --name <value>
    22. Set a custom name for the application on YARN
    23. -qu <arg> | --queue <arg>
    24. Specifies YARN queue
    25. -s <arg> | --slots <arg>
    26. Number of slots per TaskManager
    27. -tm <arg> | --taskManagerMemory <arg>
    28. Memory per TaskManager container with optional unit (default: MB)
    29. -a <path/to/jar> | --addclasspath <path/to/jar>
    30. Specifies additional jars to be used in Flink
    31. --configDir <value>
    32. The configuration directory.
    33. -h | --help