JDBC

Function

The JDBC connector is provided by Apache Flink and can be used to read data from and write data to common databases, such as MySQL and PostgreSQL. Source tables, result tables, and dimension tables are supported.

Table 1 Supported types

Type

Description

Supported Table Types

Source table, dimension table, and result table

Prerequisites

Caveats

Syntax

create table jbdcTable (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
) with (
  'connector' = 'jdbc',
  'url' = '',
  'table-name' = '',
  'username' = '',
  'password' = ''
);

Description

Table 2 Parameters

Parameter

Mandatory

Default Value

Data Type

Description

connector

Yes

None

String

Connector to be used. Set this parameter to jdbc.

url

Yes

None

String

Database URL

  • To connect to a MySQL database, the format is jdbc:mysql://MySQL address:MySQL port/Database name.
  • To connect to a PostgreSQL database, the format is jdbc:postgresql://PostgreSQL address:PostgreSQL port/Database name.

table-name

Yes

None

String

Name of the table where the data will be read from the database

driver

No

None

String

Driver required for connecting to the database. If you do not set this parameter, the automatically extracted URL will be used.

  • The default driver of the MySQL database is com.mysql.jdbc.Driver.
  • The default driver of the PostgreSQL database is org.postgresql.Driver.

username

No

None

String

Database authentication user name. This parameter must be configured in pair with password.

password

No

None

String

Database authentication password. This parameter must be configured in pair with username.

connection.max-retry-timeout

No

60s

Duration

Maximum timeout between retries. The timeout should be in second granularity and should not be smaller than 1 second.

scan.partition.column

No

None

String

Name of the column used to partition the input. For details, see Partitioned Scan.

scan.partition.num

No

None

Integer

Number of partitions to be created. For details, see Partitioned Scan.

scan.partition.lower-bound

No

None

Integer

Lower bound of values to be fetched for the first partition. For details, see Partitioned Scan.

scan.partition.upper-bound

No

None

Integer

Upper bound of values to be fetched for the last partition. For details, see Partitioned Scan.

scan.fetch-size

No

0

Integer

Number of rows fetched from the database each time. If this parameter is set to 0, the SQL hint is ignored.

scan.auto-commit

No

true

Boolean

Whether each statement is committed in a transaction automatically.

lookup.cache.max-rows

No

None

Integer

Maximum number of rows in the lookup cache. When the rows exceed this value, the first item added to the cache will be marked as expired. By default, the lookup cache is not enabled. For details, see Lookup Cache Functions.

lookup.cache.ttl

No

None

Duration

Maximum survival time of each record in the lookup cache. When the rows exceed this value, the first item added to the cache will be marked as expired. By default, the lookup cache is not enabled. For details, see Lookup Cache Functions.

lookup.cache.caching-missing-key

No

true

Boolean

Whether to cache empty query results. The default value is true. For details, see Lookup Cache Functions.

lookup.max-retries

No

3

Integer

Maximum number of retry attempts when a database query fails.

sink.buffer-flush.max-rows

No

100

Integer

Maximum number of cached records before flushing, which can be set to 0 to disable it.

sink.buffer-flush.interval

No

1s

Duration

The interval for flushing, after which the asynchronous thread will flush the data. Can be set to 0 to disable it. To fully handle the flush events of the cache asynchronously, sink.buffer-flush.max-rows can be set to 0 and an appropriate flush time interval can be configured.

sink.max-retries

No

3

Integer

Maximum number of retries after a failed attempt to write records to the database.

sink.parallelism

No

None

Integer

Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework: using the same parallelism as the upstream chained operator.

Partitioned Scan

To accelerate reading data in parallel Source task instances, Flink provides the partitioned scan feature for the JDBC table. The following parameters describe how to partition the table when reading in parallel from multiple tasks.

  • When a table is created, the preceding partitioned scan parameters must all be specified if any of them is specified.
  • The scan.partition.lower-bound and scan.partition.upper-bound parameters are used to decide the partition stride instead of filtering rows in the table. All rows in the table are partitioned and returned.

Lookup Cache Functions

The JDBC connector can be used as a lookup dimension table in temporal table joins, and currently only supports synchronous lookup mode.

By default, lookup cache is disabled. Therefore, all requests are sent to the external database. You can set lookup.cache.max-rows and lookup.cache.ttl to enable this feature. The main purpose of the lookup cache is to improve the performance of the JDBC connector in temporal table joins.

When the lookup cache is enabled, each process (i.e. TaskManager) will maintain a cache. Flink will first look up the cache, and only when the cache is not found will it send a request to the external database and update the cache with the returned data. When the cache hits the maximum cache rows lookup.cache.max-rows or when the rows exceed the maximum survival time lookup.cache.ttl, the first item added to the cache will be marked as expired. The records in the cache may not be the latest, and users can set lookup.cache.ttl to a smaller value to get better data refresh, but this may increase the number of requests sent to the database. Therefore, a balance between throughput and correctness should be maintained.

By default, Flink caches empty query results for primary keys, but you can switch this behavior by setting lookup.cache.caching-missing-key to false.

Data Type Mapping

Table 3 Data type mapping

MySQL Type

PostgreSQL Type

Flink SQL Type

TINYINT

-

TINYINT

SMALLINT

TINYINT UNSIGNED

SMALLINT

INT2

SMALLSERIAL

SERIAL2

SMALLINT

INT

MEDIUMINT

SMALLINT UNSIGNED

INTEGER

SERIAL

INT

BIGINT

INT UNSIGNED

BIGINT

BIGSERIAL

BIGINT

BIGINT UNSIGNED

-

DECIMAL(20, 0)

BIGINT

BIGINT

BIGINT

FLOAT

REAL

FLOAT4

FLOAT

DOUBLE

DOUBLE PRECISION

FLOAT8

DOUBLE PRECISION

DOUBLE

NUMERIC(p, s)

DECIMAL(p, s)

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

TINYINT(1)

BOOLEAN

BOOLEAN

DATE

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

VARCHAR(n)

TEXT

CHAR(n)

CHARACTER(n)

VARCHAR(n)

CHARACTER

VARYING(n)

TEXT

STRING

BINARY

VARBINARY

BLOB

BYTEA

BYTES

-

ARRAY

ARRAY

Example

FAQ

None