Deduplication
Deduplication removes rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once; this may result in duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs - e.g. , COUNT
- so deduplication is needed before further analysis.
The following shows the syntax of the Deduplication statement:
ROW_NUMBER()
: Assigns an unique, sequential number to each row, starting with one.PARTITION BY col1[, col2...]
: Specifies the partition columns, i.e. the deduplicate key.ORDER BY time_attr [asc|desc]
: Specifies the ordering column, it must be a time attribute. Currently Flink supports and event time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.WHERE rownum = 1
: Therownum = 1
is required for Flink to recognize this query is deduplication.
order_time STRING,
product STRING,
num BIGINT,
proctime AS PROCTIME()
) WITH (...);
SELECT order_id, user, product, num
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num
FROM Orders)
WHERE row_num = 1