Data Types

    If the type has not been declared, data would be serialized or deserialized using Pickle. For example, the program below specifies no data types.

    • Passing Python records to Java operations.
    • Improve serialization and deserialization performance.

    Since Java operators or functions can not identify Python data, types need to be provided to help to convert Python types to Java types for processing. For example, types need to be provided if you want to output data using the StreamingFileSink which is implemented in Java.

    1. from pyflink.common.typeinfo import Types
    2. from pyflink.datastream.connectors import StreamingFileSink
    3. def streaming_file_sink():
    4. env = StreamExecutionEnvironment.get_execution_environment()
    5. env.set_parallelism(1)
    6. env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
    7. .map(lambda record: (record[0]+1, record[1].upper()),
    8. .add_sink(StreamingFileSink
    9. .for_row_format('/tmp/output', Encoder.simple_string_encoder())
    10. .build())
    11. env.execute()
    12. if __name__ == '__main__':

    Improve serialization and deserialization performance

    Supported Data Types

    You can use pyflink.common.typeinfo.Types to define types in Python DataStream API. The table below shows the types supported now and how to define them:

    PyFlink Array TypePython TypeJava Type
    Types.PRIMITIVE_ARRAY(Types.BYTE())bytesbyte[]
    Types.PRIMITIVE_ARRAY(Types.BOOLEAN())list of boolboolean[]
    list of intshort[]
    Types.PRIMITIVE_ARRAY(Types.INT())list of intint[]
    Types.PRIMITIVE_ARRAY(Types.LONG())list of intlong[]
    Types.PRIMITIVE_ARRAY(Types.FLOAT())list of floatfloat[]
    Types.PRIMITIVE_ARRAY(Types.DOUBLE())list of floatdouble[]
    Types.PRIMITIVE_ARRAY(Types.CHAR())list of strchar[]
    Types.BASIC_ARRAY(Types.BYTE())list of intjava.lang.Byte[]
    Types.BASIC_ARRAY(Types.BOOLEAN())list of booljava.lang.Boolean[]
    Types.BASIC_ARRAY(Types.SHORT())list of intjava.lang.Short[]
    Types.BASIC_ARRAY(Types.INT())list of intjava.lang.Integer[]
    Types.BASIC_ARRAY(Types.LONG())list of intjava.lang.Long[]
    Types.BASIC_ARRAY(Types.FLOAT())list of floatjava.lang.Float[]
    Types.BASIC_ARRAY(Types.DOUBLE())list of floatjava.lang.Double[]
    Types.BASIC_ARRAY(Types.CHAR())list of strjava.lang.Character[]
    Types.BASIC_ARRAY(Types.STRING())list of strjava.lang.String[]
    Types.OBJECT_ARRAY()Array