本地安装教程

    Flink runs on Linux, Mac OS X, and Windows. To be able to run Flink, the only requirement is to have a working Java 8.x installation. Windows users, please take a look at the Flink on Windows guide which describes how to run Flink on Windows for local setups.

    You can check the correct installation of Java by issuing the following command:

    If you have Java 8, the output will look something like this:

    1. Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
    2. Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
    • Download a binary from the . You can pickany Scala variant you like. For certain features you may also have to download one of the pre-bundled Hadoop jarsand place them into the /lib directory.
    • Go to the download directory.
    • Unpack the downloaded archive.
    1. $ cd ~/Downloads # Go to download directory
    2. $ tar xzf flink-*.tgz # Unpack the downloaded archive
    3. $ cd flink-1.9.0

    For MacOS X users, Flink can be installed through Homebrew.

    1. $ brew install apache-flink
    2. ...
    3. $ flink --version
    4. Version: 1.2.0, Commit ID: 1c659cf

    You can also verify that the system is running by checking the log files in the logs directory:

    1. $ tail log/flink-*-standalonesession-*.log
    2. INFO ... - Rest endpoint listening at localhost:8081
    3. INFO ... - http://localhost:8081 was granted leadership ...
    4. INFO ... - Web frontend listening at http://localhost:8081.
    5. INFO ... - Starting RPC endpoint for StandaloneResourceManager at akka://flink/user/resourcemanager .
    6. INFO ... - Starting RPC endpoint for StandaloneDispatcher at akka://flink/user/dispatcher .
    7. INFO ... - ResourceManager akka.tcp://flink@localhost:6123/user/resourcemanager was granted leadership ...
    8. INFO ... - Starting the SlotManager.
    9. INFO ... - Dispatcher akka.tcp://flink@localhost:6123/user/dispatcher was granted leadership ...
    10. INFO ... - Recovering all persisted jobs.
    11. INFO ... - Registering TaskManager ... at ResourceManager

    You can find the complete source code for this SocketWindowWordCount example in scala and on GitHub.

    1. object SocketWindowWordCount {
    2. def main(args: Array[String]) : Unit = {
    3. // the port to connect to
    4. val port: Int = try {
    5. ParameterTool.fromArgs(args).getInt("port")
    6. } catch {
    7. case e: Exception => {
    8. System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
    9. return
    10. }
    11. }
    12. // get the execution environment
    13. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    14. // get input data by connecting to the socket
    15. // parse the data, group it, window it, and aggregate the counts
    16. val windowCounts = text
    17. .map { w => WordWithCount(w, 1) }
    18. .keyBy("word")
    19. .timeWindow(Time.seconds(5), Time.seconds(1))
    20. .sum("count")
    21. // print the results with a single thread, rather than in parallel
    22. windowCounts.print().setParallelism(1)
    23. env.execute("Socket Window WordCount")
    24. }
    25. // Data type for words with count
    26. case class WordWithCount(word: String, count: Long)
    27. }
    1. public class SocketWindowWordCount {
    2. public static void main(String[] args) throws Exception {
    3. // the port to connect to
    4. final int port;
    5. try {
    6. final ParameterTool params = ParameterTool.fromArgs(args);
    7. port = params.getInt("port");
    8. } catch (Exception e) {
    9. System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
    10. return;
    11. }
    12. // get the execution environment
    13. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. // get input data by connecting to the socket
    15. DataStream<String> text = env.socketTextStream("localhost", port, "\n");
    16. // parse the data, group it, window it, and aggregate the counts
    17. DataStream<WordWithCount> windowCounts = text
    18. .flatMap(new FlatMapFunction<String, WordWithCount>() {
    19. @Override
    20. public void flatMap(String value, Collector<WordWithCount> out) {
    21. for (String word : value.split("\\s")) {
    22. out.collect(new WordWithCount(word, 1L));
    23. }
    24. }
    25. .timeWindow(Time.seconds(5), Time.seconds(1))
    26. .reduce(new ReduceFunction<WordWithCount>() {
    27. @Override
    28. public WordWithCount reduce(WordWithCount a, WordWithCount b) {
    29. return new WordWithCount(a.word, a.count + b.count);
    30. }
    31. });
    32. // print the results with a single thread, rather than in parallel
    33. windowCounts.print().setParallelism(1);
    34. env.execute("Socket Window WordCount");
    35. }
    36. // Data type for words with count
    37. public static class WordWithCount {
    38. public String word;
    39. public long count;
    40. public WordWithCount() {}
    41. public WordWithCount(String word, long count) {
    42. this.word = word;
    43. this.count = count;
    44. }
    45. @Override
    46. public String toString() {
    47. return word + " : " + count;
    48. }
    49. }
    50. }

    Now, we are going to run this Flink application. It will read text froma socket and once every 5 seconds print the number of occurrences ofeach distinct word during the previous 5 seconds, i.e. a tumblingwindow of processing time, as long as words are floating in.

    • First of all, we use netcat to start local server via
    • Submit the Flink program:
    1. $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
    2. Starting execution of program

    Dispatcher: Overview (cont'd)

    • Words are counted in time windows of 5 seconds (processing time, tumblingwindows) and are printed to stdout. Monitor the TaskManager’s output fileand write some text in nc (input is sent to Flink line by line afterhitting ):
    1. $ nc -l 9000
    2. lorem ipsum
    3. ipsum ipsum ipsum
    4. bye

    The .out file will print the counts at the end of each time window as long as words are floating in, e.g.:

    1. $ tail -f log/flink-*-taskexecutor-*.out
    2. lorem : 1
    3. ipsum : 4

    To stop Flink when you’re done type: