You can create a file system result table to export data to a file system such as HDFS or OBS. After the data is generated, a non-DLI table can be created directly according to the generated directory. The table can be processed through DLI SQL, and the output data directory can be stored in partition tables. It is applicable to scenarios such as data dumping, big data analysis, data backup, and active, deep, or cold archiving.
1 2 3 4 5 6 7 | create table filesystemSink ( attr_name attr_type (',' attr_name attr_type) * ) with ( 'connector.type' = 'filesystem', 'connector.file-path' = '', 'format.type' = '' ); |
Parameter |
Mandatory |
Description |
---|---|---|
connector.type |
Yes |
The value is fixed to filesystem. |
connector.file-path |
Yes |
Data output directory. The format is schema://file.path. NOTE:
Currently, Schema supports only OBS and HDFS.
|
format.type |
Yes |
Output data encoding format. Only parquet and csv are supported.
|
format.field-delimiter |
No |
Delimiter used to separate every two attributes. This parameter needs to be configured if the CSV encoding format is adopted. It can be user-defined, for example, a comma (,). |
connector.ak |
No |
Access key for accessing OBS This parameter is mandatory when data is written to OBS. |
connector.sk |
No |
Secret key for accessing OBS This parameter is mandatory when data is written to OBS. |
connector.partitioned-by |
No |
Partitioning field. Use commas (,) to separate multiple fields. |
Read data from Kafka and write the data in Parquet format to the fileName directory in the bucketName bucket.
create table kafkaSource( attr0 string, attr1 boolean, attr2 TINYINT, attr3 smallint, attr4 int, attr5 bigint, attr6 float, attr7 double, attr8 timestamp(3), attr9 time ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test_json', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test_filesystem', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'csv' ); create table filesystemSink( attr0 string, attr1 boolean, attr2 TINYINT, attr3 smallint, attr4 int, attr5 bigint, attr6 float, attr7 double, attr8 map < string, string >, attr9 timestamp(3), attr10 time ) with ( "connector.type" = "filesystem", "connector.file-path" = "obs://bucketName/fileName", "format.type" = "parquet", "connector.ak" = "xxxx", "connector.sk" = "xxxxxx" ); insert into filesystemSink select attr0, attr1, attr2, attr3, attr4, attr5, attr6, attr7, map [attr0,attr0], attr8, attr9 from kafkaSource;