Deduplication

    Deduplication removes rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once; this may result in duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs - e.g. , COUNT - so deduplication is needed before further analysis.

    The following shows the syntax of the Deduplication statement:

    • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
    • PARTITION BY col1[, col2...]: Specifies the partition columns, i.e. the deduplicate key.
    • ORDER BY time_attr [asc|desc]: Specifies the ordering column, it must be a time attribute. Currently Flink supports and event time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
    • WHERE rownum = 1: The rownum = 1 is required for Flink to recognize this query is deduplication.
    1. order_time STRING,
    2. product STRING,
    3. num BIGINT,
    4. proctime AS PROCTIME()
    5. ) WITH (...);
    6. SELECT order_id, user, product, num
    7. FROM (
    8. SELECT *,
    9. ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num
    10. FROM Orders)
    11. WHERE row_num = 1