Use Flink to Load Data to Relyt
This topic will guide you through the process of using Flink to load data into Relyt, providing a comprehensive example for your reference.
1. Preparations
Before you start, ensure that you have a Flink cluster in your environment. If no, configure one by following the instructions provided in this chapter. Otherwise, skip this chapter and jump to chapter 2.
1.1 Download dependencies and tools
-
Download the following dependencies:
-
Hadoop dependency: https://archive.apache.org/dist/hadoop/common/hadoop-3.1.1/hadoop-3.1.1.tar.gz
-
Flink dependency: https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
-
Java dependency of the needed version: https://www.oracle.com/java/technologies/downloads/#java8
Alternatively, you can run the following commands (
java-1.8.0-openjdk.x86_64
is used as an example):yum -y list java*
yum install java-1.8.0-openjdk.x86_64
# Configure Java environment
JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk.x86_64
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.cdjar
export JAVA_HOME CLASSPATH PATH
::: -
-
Download the PostgreSQL JDBC driver, hadoop-aws dependency, and aws-sdk dependency, and save it in the
lib
directory of Flink. -
Download the Relyt CDC driver and save it in the
lib
directory of Flink.
1.2 Configure the environment
-
Unzip the Hadoop installation package.
-
Go to the Hadoop directory and get the
classpath
of Hadoop.# Replace /usr/local/hadoop with the actual Hadoop path.
cd /usr/local/hadoop
bin/hadoop classpath -
In the environment variables, configure the dependencies of Hadoop, Java, and Flink:
#set java environment
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.cdjar
export JAVA_HOME CLASSPATH PATH
export HADOOP_CLASSPATH=/root/testonly/hadoop-3.1.0/etc/hadoop:/root/testonly/hadoop-3.1.0/share/hadoop/common/lib/*:/root/testonly/hadoop-3.1.0/share/hadoop/common/*:/root/testonly/hadoop-3.1.0/share/hadoop/hdfs:/root/testonly/hadoop-3.1.0/share/hadoop/hdfs/lib/*:/root/testonly/hadoop-3.1.0/share/hadoop/hdfs/*:/root/testonly/hadoop-3.1.0/share/hadoop/mapreduce/*:/root/testonly/hadoop-3.1.0/share/hadoop/yarn:/root/testonly/hadoop-3.1.0/share/hadoop/yarn/lib/*:/root/testonly/hadoop-3.1.0/share/hadoop/yarn/*
export HADOOP_CONF_DIR=/root/testonly/hadoop-3.1.0/etc/hadoopThe Hadoop classpath provided here is for reference only.
-
Modify the Flink configuration file
conf/flink-conf.yaml
, and configure the UI and execution parameters. After the configuration is completed, run the following command to start the cluster:bin/start-cluster.sh
-
Run the following command to open the Flink console. Then you can start creating Flink tables and perform other related SQL operations:
bin/sql-client.sh
2. Create Flink tables
Create a source Fink table and a Flink table that corresponds to Relyt.
# Flink source table
CREATE TABLE flink_source (
id INT,
content VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = 100,
'number-of-rows' = 500
);
# Flink sink table
CREATE TABLE flink_to_relyt(
a INT,
b VARCHAR
) WITH (
'connector' = 'relyt',
'url' = 'jdbc:postgresql://120.92.109.187:5432/test',
'username' = '[email protected]',
'password' = 'test123',
'sink.basepath' = 'relyt://bucket-name/flink-relyt-connector/',
'sink.extra' = 'fs.relyt.endpoint=ks3-cn-beijing.ksyuncs.com,fs.relyt.access.key=<access_key>,fs.relyt.secret.key=<secret_key>',
'table-name' = 'relyt_destination',
'verbose' = 'true'
);
The table names in the example code above are for reference only. Replace them based on your specific needs. For description of other parameters, see Parameter description.
Parameter description
The following table describes other related parameters.
Parameter | Required | Description | Example |
---|---|---|---|
connector | Yes | The connector name. It is fixed to relyt . | relyt |
url | Yes | The URL of Relyt. The format is: jdbc:/postgres://<_host_name_>:<_port_>/<_database_name_> , where <host_name> specifies the host name, <port> specifies the port number, and <database_name> specifies the name of the Relyt database. | jdbc:postgresql://localhost:10001/postgres |
username | Yes | The username used to connect to Relyt. | [email protected] |
password | No | The password of the username. | test123 |
table-name | Yes | The destination Relyt table to which data is loaded. | relyt_destination |
sink.basepath | Yes | The temporary directory for storing loaded data. It must start with relyt:// . | N/A |
sink.extra | Yes | Other configuration items, including the AK/SK, endpoint, and option specifying whether to clear data in the temporary directory. The parameter value can be a list of key-value pairs. Note: If an internal network is used, the endpoint must be set to internal . | fs.relyt.endpoint=ks3-cn-beijing.ksyuncs.com,fs.relyt.access.key=<access_key>,fs.relyt.secret.key=<secret_key> |
sink.async_job_option | Yes | It is fixed to format=csv,delimiter="," . | format=csv,delimiter="," |
sink.max_temp_file_size | No | The maximum size supported by the temporary directory, in bytes. The default is 96 MB. | 100663296 |
sink.async_load_parallelism | No | The concurrency of asynchronous load tasks. | 2 |
sink.async_load_queue_size | No | The queue size of asynchronous tasks. When the number of queued tasks exceeds this value, Flink will wait for Relyt to complete the current task before initiating a new task. | 4 |
sink.invalid_encoding_mode | No | Whether to delete illegal characters, such as 0x00 . 0 indicates no and 2 indicates yes. | 2 |
verbose | No | Whether to output the running log of the connector. false indicates no and true indicates yes. | false |
Usage notes
The schema of the Flink table must be consistent with that of the destination Relyt table or a subset of the destination schema. In addition, the name and data type of each column in the table must be consistent with the that in the destination Relyt table.
The same data type may have different names in Flink and Relyt. The following table provides the mappings between the data types in Flink and Relyt.
Flink data type | Relyt data type |
---|---|
CHAR | char |
VARCHAR | varchar / json (jsonb ) |
STRING | varchar |
BOOLEAN | boolean |
BINARY | Not supported |
VARBINARY | Not supported |
BYTES | Not supported |
DECIMAL | decimal |
TINYINT | smallint |
SMALLINT | smallint |
INTEGER | integer |
BIGINT | bigint |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME | Not supported |
TIMESTAMP | timestamp |
TIMESTAMP\_LTZ | timestamp |
INTERVAL | Not supported |
ARRAY | Not supported |
MULTISET | Not supported |
MAP | Not supported |
ROW | Not supported |
RAW | Not supported |
3. Load data to Relyt
Run the following command to load data to Relyt:
INSERT INTO flink_to_relyt SELECT * FROM flink_source;
In this command, flink_to_relyt
and flink_source
are examples only. Replace them with the names of the actual Flink table that corresponds to Relyt and destination Flink table when in actual use.
4. Check the loading progress
You can check the loading progress on Flink or Relyt.
4.1 Check the loading progress on Flink
The WebUI offered by Flink allows you to view status of checkpoints. A successful checkpoint indicates that the corresponding data is written to the temporary storage directory and the task metadata is recorded in Relyt.
4.2 Check the loading progress on Relyt
Call relyt_get_async_load_job()
to view the status of the data loading task:
SELECT * FROM relyt_get_async_load_job() WHERE target::regclass = 'relyt_destination'::regclass;
In this command, relyt_destination
is only an example. Replace it with the name of the actual destination Relyt table in your environment.
Possible return values include:
-
INIT
: Data is being written to the temporary storage directory. -
READY
: Data has been written to the temporary storage directory and is ready to load to Relyt. -
RUNNING
: Data is being loaded from the temporary storage directory to Relyt. -
FINISH
: The loading task is completed. -
FAIL
: The loading task failed. -
CANCEL
: The loading task has been canceled.
When the return value is FINISH
, the loading task is successfully completed.
5. FAQ
Question 1: What can I do when I encounter an illegal character error while importing tables into Relyt via Fink?
Error details:
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Failed to trigger load for async job [bf82c2b757aa0158f60ccbc5a1a15de0-1-0-1721899242356]. Error message: ERROR: invalid byte sequence for encoding "UTF8": 0x00 (seg2 slice1 10.129.30.103:40000 pid=102620)
Cause:
The data you imported contains illegal UTF-8 characters (e.g., 0x00
), which Relyt cannot parse properly. As a result, the task failed.
Solutions:
Solution 1: Clean the data at the source to remove illegal characters like 0x00
, and then import it again.
Solution 2: When creating the Flink sink table, add the 'sink.invalid_encoding_mode'='2'
configuration in the CREATE TABLE
statement. This will automatically remove illegal characters like 0x00
during the import process. Here is a configuration example:
CREATE TABLE flink_to_relyt (
a INT,
b VARCHAR
) WITH (
'connector' = 'relyt',
'url' = 'jdbc:postgresql://<ip>:<port>/test',
'username' = '<dw_user_name>',
'password' = '<password>',
...
'sink.invalid_encoding_mode'='2'
);
Question 2: What can I do when I encounter a table attribute mismatch error while importing tables into Relyt via Fink?
Error details:
Caused by: cloud.data.relyt.flink.exception.RelytSqlException: Column id with type BIGINT is not compatible with target column Attribute{attname='id', atttypid=23(INT4), attnum=1, atttypmod=-1} in table channel
Cause:
The schema of the Flink sink table does not match that of the corresponding table in Relyt.
Solutions:
Modify the schema of the Flink sink table or that of the Relyt table to match each other by referring to the data type mapping table.