Skip to main content

Use Flink to Load Data to Relyt

This topic will guide you through the process of using Flink to load data into Relyt, providing a comprehensive example for your reference.


1. Preparations

Before you start, ensure that you have a Flink cluster in your environment. If no, configure one by following the instructions provided in this chapter. Otherwise, skip this chapter and jump to chapter 2.

1.1 Download dependencies and tools

  1. Download the following dependencies:

    Alternatively, you can run the following commands (java-1.8.0-openjdk.x86_64 is used as an example):

    yum -y list java*
    yum install java-1.8.0-openjdk.x86_64

    # Configure Java environment
    JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk.x86_64
    PATH=$PATH:$JAVA_HOME/bin
    CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.cdjar
    export JAVA_HOME CLASSPATH PATH
    :::

  2. Download the PostgreSQL JDBC driver, hadoop-aws dependency, and aws-sdk dependency, and save it in the lib directory of Flink.

  3. Download the Relyt CDC driver and save it in the lib directory of Flink.

1.2 Configure the environment

  1. Unzip the Hadoop installation package.

  2. Go to the Hadoop directory and get the classpath of Hadoop.

    # Replace /usr/local/hadoop with the actual Hadoop path.
    cd /usr/local/hadoop
    bin/hadoop classpath
  3. In the environment variables, configure the dependencies of Hadoop, Java, and Flink:

    #set java environment
    JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
    PATH=$PATH:$JAVA_HOME/bin
    CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.cdjar
    export JAVA_HOME CLASSPATH PATH

    export HADOOP_CLASSPATH=/root/testonly/hadoop-3.1.0/etc/hadoop:/root/testonly/hadoop-3.1.0/share/hadoop/common/lib/*:/root/testonly/hadoop-3.1.0/share/hadoop/common/*:/root/testonly/hadoop-3.1.0/share/hadoop/hdfs:/root/testonly/hadoop-3.1.0/share/hadoop/hdfs/lib/*:/root/testonly/hadoop-3.1.0/share/hadoop/hdfs/*:/root/testonly/hadoop-3.1.0/share/hadoop/mapreduce/*:/root/testonly/hadoop-3.1.0/share/hadoop/yarn:/root/testonly/hadoop-3.1.0/share/hadoop/yarn/lib/*:/root/testonly/hadoop-3.1.0/share/hadoop/yarn/*
    export HADOOP_CONF_DIR=/root/testonly/hadoop-3.1.0/etc/hadoop

    The Hadoop classpath provided here is for reference only.

  4. Modify the Flink configuration file conf/flink-conf.yaml, and configure the UI and execution parameters. After the configuration is completed, run the following command to start the cluster:

    bin/start-cluster.sh
  5. Run the following command to open the Flink console. Then you can start creating Flink tables and perform other related SQL operations:

    bin/sql-client.sh

Create a source Fink table and a Flink table that corresponds to Relyt.

# Flink source table
CREATE TABLE flink_source (
id INT,
content VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = 100,
'number-of-rows' = 500
);

# Flink sink table
CREATE TABLE flink_to_relyt(
a INT,
b VARCHAR
) WITH (
'connector' = 'relyt',
'url' = 'jdbc:postgresql://120.92.109.187:5432/test',
'username' = '[email protected]',
'password' = 'test123',
'sink.basepath' = 'relyt://bucket-name/flink-relyt-connector/',
'sink.extra' = 'fs.relyt.endpoint=ks3-cn-beijing.ksyuncs.com,fs.relyt.access.key=<access_key>,fs.relyt.secret.key=<secret_key>',
'table-name' = 'relyt_destination',
'verbose' = 'true'
);

The table names in the example code above are for reference only. Replace them based on your specific needs. For description of other parameters, see Parameter description.

Parameter description

The following table describes other related parameters.

ParameterRequiredDescriptionExample
connectorYesThe connector name. It is fixed to relyt.relyt
urlYesThe URL of Relyt. The format is: jdbc:/postgres://<_host_name_>:<_port_>/<_database_name_>, where <host_name> specifies the host name, <port> specifies the port number, and <database_name> specifies the name of the Relyt database.jdbc:postgresql://localhost:10001/postgres
usernameYesThe username used to connect to Relyt.[email protected]
passwordNoThe password of the username.test123
table-nameYesThe destination Relyt table to which data is loaded.relyt_destination
sink.basepathYesThe temporary directory for storing loaded data. It must start with relyt://.N/A
sink.extraYesOther configuration items, including the AK/SK, endpoint, and option specifying whether to clear data in the temporary directory. The parameter value can be a list of key-value pairs.
Note:
If an internal network is used, the endpoint must be set to internal.
fs.relyt.endpoint=ks3-cn-beijing.ksyuncs.com,fs.relyt.access.key=<access_key>,fs.relyt.secret.key=<secret_key>
sink.async_job_optionYesIt is fixed to format=csv,delimiter=",".format=csv,delimiter=","
sink.max_temp_file_sizeNoThe maximum size supported by the temporary directory, in bytes. The default is 96 MB.100663296
sink.async_load_parallelismNoThe concurrency of asynchronous load tasks.2
sink.async_load_queue_sizeNoThe queue size of asynchronous tasks. When the number of queued tasks exceeds this value, Flink will wait for Relyt to complete the current task before initiating a new task.4
sink.invalid_encoding_modeNoWhether to delete illegal characters, such as 0x00. 0 indicates no and 2 indicates yes.2
verboseNoWhether to output the running log of the connector. false indicates no and true indicates yes.false

Usage notes

The schema of the Flink table must be consistent with that of the destination Relyt table or a subset of the destination schema. In addition, the name and data type of each column in the table must be consistent with the that in the destination Relyt table.

The same data type may have different names in Flink and Relyt. The following table provides the mappings between the data types in Flink and Relyt.

Flink data typeRelyt data type
CHARchar
VARCHARvarchar / json (jsonb)
STRINGvarchar
BOOLEANboolean
BINARYNot supported
VARBINARYNot supported
BYTESNot supported
DECIMALdecimal
TINYINTsmallint
SMALLINTsmallint
INTEGERinteger
BIGINTbigint
FLOATfloat4
DOUBLEfloat8
DATEdate
TIMENot supported
TIMESTAMPtimestamp
TIMESTAMP\_LTZtimestamp
INTERVALNot supported
ARRAYNot supported
MULTISETNot supported
MAPNot supported
ROWNot supported
RAWNot supported


3. Load data to Relyt

Run the following command to load data to Relyt:

INSERT INTO flink_to_relyt SELECT * FROM flink_source;

In this command, flink_to_relyt and flink_source are examples only. Replace them with the names of the actual Flink table that corresponds to Relyt and destination Flink table when in actual use.


4. Check the loading progress

You can check the loading progress on Flink or Relyt.

The WebUI offered by Flink allows you to view status of checkpoints. A successful checkpoint indicates that the corresponding data is written to the temporary storage directory and the task metadata is recorded in Relyt.

4.2 Check the loading progress on Relyt

Call relyt_get_async_load_job() to view the status of the data loading task:

SELECT * FROM relyt_get_async_load_job() WHERE target::regclass = 'relyt_destination'::regclass;

In this command, relyt_destination is only an example. Replace it with the name of the actual destination Relyt table in your environment.

Possible return values include:

  • INIT: Data is being written to the temporary storage directory.

  • READY: Data has been written to the temporary storage directory and is ready to load to Relyt.

  • RUNNING: Data is being loaded from the temporary storage directory to Relyt.

  • FINISH: The loading task is completed.

  • FAIL: The loading task failed.

  • CANCEL: The loading task has been canceled.

When the return value is FINISH, the loading task is successfully completed.


5. FAQ

Question 1: What can I do when I encounter an illegal character error while importing tables into Relyt via Fink?

Error details:

Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Failed to trigger load for async job [bf82c2b757aa0158f60ccbc5a1a15de0-1-0-1721899242356]. Error message: ERROR: invalid byte sequence for encoding "UTF8": 0x00  (seg2 slice1 10.129.30.103:40000 pid=102620)

Cause:

The data you imported contains illegal UTF-8 characters (e.g., 0x00), which Relyt cannot parse properly. As a result, the task failed.

Solutions:

Solution 1: Clean the data at the source to remove illegal characters like 0x00, and then import it again.

Solution 2: When creating the Flink sink table, add the 'sink.invalid_encoding_mode'='2' configuration in the CREATE TABLE statement. This will automatically remove illegal characters like 0x00 during the import process. Here is a configuration example:

CREATE TABLE flink_to_relyt (
a INT,
b VARCHAR
) WITH (
'connector' = 'relyt',
'url' = 'jdbc:postgresql://<ip>:<port>/test',
'username' = '<dw_user_name>',
'password' = '<password>',
...
'sink.invalid_encoding_mode'='2'
);

Question 2: What can I do when I encounter a table attribute mismatch error while importing tables into Relyt via Fink?

Error details:

Caused by: cloud.data.relyt.flink.exception.RelytSqlException: Column id with type BIGINT is not compatible with target column Attribute{attname='id', atttypid=23(INT4), attnum=1, atttypmod=-1} in table channel

Cause:

The schema of the Flink sink table does not match that of the corresponding table in Relyt.

Solutions:

Modify the schema of the Flink sink table or that of the Relyt table to match each other by referring to the data type mapping table.