Getting Started
You only need to have basic knowledge of SQL to follow along. No other programming experience is assumed.
There are multiple ways to install Flink. For experimentation, the most common option is to download the binaries and run them locally. You can follow the steps in to set up an environment for the rest of the tutorial.
Once you’re all set, use the following command to start a local cluster from the installation folder:
Once started, the Flink WebUI on localhost:8081 is available locally, from which you can monitor the different jobs.
The is an interactive client to submit SQL queries to Flink and visualize the results. To start the SQL client, run the script from the installation folder.
./bin/sql-client.sh
Once the SQL client, our query editor, is up and running, it’s time to start writing queries. Let’s start with printing ‘Hello World’, using the following simple query:
SELECT 'Hello World';
These functions provide users with a powerful toolbox of functionality when developing SQL queries. For example, CURRENT_TIMESTAMP
will print the machine’s current system time where it is executed.
As with all SQL engines, Flink queries operate on top of tables. It differs from a traditional database because Flink does not manage data at rest locally; instead, its queries operate continuously over external tables.
Flink data processing pipelines begin with source tables. Source tables produce rows operated over during the query’s execution; they are the tables referenced in the FROM
clause of a query. These could be Kafka topics, databases, filesystems, or any other system that Flink knows how to consume.
Tables can be defined through the SQL client or using environment config file. The SQL client support SQL DDL commands similar to traditional SQL. Standard SQL DDL is used to , alter, tables.
Flink has a support for different connectors and that can be used with tables. Following is an example to define a source table backed by a CSV file with emp_id
, name
, dept_id
as columns in a table statement.
CREATE TABLE employee_information (
emp_id INT,
name VARCHAR,
dept_id INT
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/something.csv',
'format' = 'csv'
);
A continuous query can be defined from this table that reads new rows as they are made available and immediately outputs their results. For example, we can filter for just those employees who work in department 1
.
A never terminates and produces a dynamic table as a result. Dynamic tables are the core concept of Flink’s Table API and SQL support for streaming data.
Aggregations on continuous streams need to store aggregated results continuously during the execution of the query. For example, suppose you need to count the number of employees for each department from an incoming data stream. The query needs to maintain the most up to date count for each department to output timely results as new rows are processed.
dept_id,
FROM employee_information
GROUP BY dept_id;
Such queries are considered stateful. Flink’s advanced fault-tolerance mechanism will maintain internal state and consistency, so queries always return the correct result, even in the face of hardware failure.
When running this query, the SQL client provides output in real-time but in a read-only fashion. Storing results - to power a report or dashboard - requires writing out to another table. This can be achieved using an INSERT INTO
statement. The table referenced in this clause is known as a sink table. An INSERT INTO
statement will be submitted as a detached query to the Flink cluster.
INSERT INTO department_counts
SELECT
dept_id,
COUNT(*) as emp_count
Once submitted, this will run and store the results into the sink table directly, instead of loading the results into the system memory.
If you get stuck, check out the . In particular, Apache Flink’s user mailing list consistently ranks as one of the most active of any Apache project and a great way to get help quickly.
- : Supported operations and syntax for SQL.
- SQL Client: Play around with Flink SQL and submit a table program to a cluster without programming knowledge
- : Shared concepts and APIs of the Table API and SQL.
- Streaming Concepts: Streaming-specific documentation for the Table API or SQL such as configuration of time attributes and handling of updating results.
- : Available connectors and formats for reading and writing data to external systems.