Flink和StreamPark自定义UDF函数的使用
<p>本文分享自天翼云开发者社区《Flink和StreamPark自定义UDF函数的使用》,作者:王****帅</p><h3 class="md-end-block md-heading md-focus"><span class="md-plain md-expand">1、什么是函数</span></h3>
<p><span class="md-plain md-expand"> 在 SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)。Flink 的 Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同:Table API 中的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串 str 转换成全大写的形式,Table API 的写法是调用 str 这个 String对象的 upperCase()方法:</span></p>
<pre class="language-bash"><code class="language-bash">str.upperCase<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;</span></span></span></code></pre>
<p> 而 SQL 中的写法就是直接引用 UPPER()函数,将 str 作为参数传入:</p>
<pre class="language-bash"><code class="language-bash">UPPER<span class="token punctuation">(str<span class="token punctuation">)</span></span></code></pre>
<p> 由于 Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较少使用。下面我们主要介绍 Flink SQL 中函数的使用。Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。</p>
<h3 class="md-end-block md-heading md-focus"><span class="md-plain md-expand">2、什么是自定义UDF函数</span></h3>
<p><span class="md-plain md-expand"> 系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。事实上,系统内置函数仍然在不断扩充,如果我们认为自己实现的自定义函数足够通用、应用非常广泛,也可以在项目跟踪工具 JIRA 上向 Flink 开发团队提出“议题”(issue),请求将新的函数添加到系统函数中。</span></p>
<h4 class="md-end-block md-heading md-focus"><span class="md-plain md-expand">2.1 编写自定义UDF函数</span></h4>
<p><span class="md-plain md-expand"> 自定义一个ScalarFunction,传入一个String类型的参数,输出这个参数的hashCode</span></p>
<pre class="language-java"><code class="language-java"><span class="token keyword">public <span class="token keyword">class <span class="token class-name">HashScalarFunction <span class="token keyword">extends <span class="token class-name">ScalarFunction <span class="token punctuation">{
<span class="token keyword">public <span class="token class-name">String <span class="token function">eval<span class="token punctuation">(<span class="token class-name">String str<span class="token punctuation">)<span class="token punctuation">{
<span class="token keyword">return <span class="token class-name">String<span class="token punctuation">.<span class="token function">valueOf<span class="token punctuation">(str<span class="token punctuation">.<span class="token function">hashCode<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<h4 class="md-end-block md-heading md-focus"><span class="md-plain">2.2 在代码中以SQL方式使用UDF函数</span></h4>
<h5 class="md-end-block md-heading"><span class="md-plain md-expand">2.2.1 读取mysql数据使用UDF函数转换并输出到控制台</span></h5>
<pre class="language-java"><code class="language-java"><span class="token keyword">package <span class="token namespace">cn<span class="token punctuation">.ctyun<span class="token punctuation">.demo<span class="token punctuation">.flinksql<span class="token punctuation">;
<span class="token keyword">import <span class="token import"><span class="token namespace">cn<span class="token punctuation">.ctyun<span class="token punctuation">.demo<span class="token punctuation">.flinksql<span class="token punctuation">.udf<span class="token punctuation">.<span class="token class-name">HashScalarFunction<span class="token punctuation">;
<span class="token keyword">import <span class="token import"><span class="token namespace">org<span class="token punctuation">.apache<span class="token punctuation">.flink<span class="token punctuation">.streaming<span class="token punctuation">.api<span class="token punctuation">.environment<span class="token punctuation">.<span class="token class-name">StreamExecutionEnvironment<span class="token punctuation">;
<span class="token keyword">import <span class="token import"><span class="token namespace">org<span class="token punctuation">.apache<span class="token punctuation">.flink<span class="token punctuation">.table<span class="token punctuation">.api<span class="token punctuation">.<span class="token class-name">Table<span class="token punctuation">;
<span class="token keyword">import <span class="token import"><span class="token namespace">org<span class="token punctuation">.apache<span class="token punctuation">.flink<span class="token punctuation">.table<span class="token punctuation">.api<span class="token punctuation">.bridge<span class="token punctuation">.java<span class="token punctuation">.<span class="token class-name">StreamTableEnvironment<span class="token punctuation">;
<span class="token doc-comment comment">/**
* @Date 2023/4/14 14:38
* @Description 读取mysql数据使用UDF函数转换并输出到控制台
*/
<span class="token keyword">public <span class="token keyword">class <span class="token class-name">FlinkSqlUdfMysql2Print <span class="token punctuation">{
<span class="token keyword">public <span class="token keyword">static <span class="token keyword">void <span class="token function">main<span class="token punctuation">(<span class="token class-name">String<span class="token punctuation">[<span class="token punctuation">] args<span class="token punctuation">) <span class="token punctuation">{
<span class="token class-name">StreamExecutionEnvironment env <span class="token operator">= <span class="token class-name">StreamExecutionEnvironment<span class="token punctuation">.<span class="token function">getExecutionEnvironment<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
env<span class="token punctuation">.<span class="token function">setParallelism<span class="token punctuation">(<span class="token number">1<span class="token punctuation">)<span class="token punctuation">;
<span class="token class-name">StreamTableEnvironment tableEnv <span class="token operator">= <span class="token class-name">StreamTableEnvironment<span class="token punctuation">.<span class="token function">create<span class="token punctuation">(env<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">// 1. 创建读取表,使用mysql进行
<span class="token class-name">String source_ddl <span class="token operator">= <span class="token string">"CREATE TABLE UserSource (" <span class="token operator">+
<span class="token string">" id INT, " <span class="token operator">+
<span class="token string">" name VARCHAR, " <span class="token operator">+
<span class="token string">" phone VARCHAR, " <span class="token operator">+
<span class="token string">" sex INT " <span class="token operator">+
<span class="token string">") WITH (" <span class="token operator">+
<span class="token string">" 'connector.type' = 'jdbc', " <span class="token operator">+
<span class="token string">" 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " <span class="token operator">+
<span class="token string">" 'connector.table' = 'test_user_table', " <span class="token operator">+
<span class="token string">" 'connector.username' = 'root', " <span class="token operator">+
<span class="token string">" 'connector.password' = '******'" <span class="token operator">+
<span class="token string">")"<span class="token punctuation">;
tableEnv<span class="token punctuation">.<span class="token function">executeSql<span class="token punctuation">(source_ddl<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">// 3. 注册自定义标量函数
tableEnv<span class="token punctuation">.<span class="token function">createTemporarySystemFunction<span class="token punctuation">(<span class="token string">"MyHash"<span class="token punctuation">, <span class="token class-name">HashScalarFunction<span class="token punctuation">.<span class="token keyword">class<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">// 4. 调用UDF查询转换
<span class="token class-name">Table resultTable <span class="token operator">= tableEnv<span class="token punctuation">.<span class="token function">sqlQuery<span class="token punctuation">(<span class="token string">"select id, name, phone, sex, MyHash(name) as name_hash from UserSource"<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">// 5. 输出到控制台
tableEnv<span class="token punctuation">.<span class="token function">executeSql<span class="token punctuation">(<span class="token string">"create table output (" <span class="token operator">+
<span class="token string">"id INT, " <span class="token operator">+
<span class="token string">"name STRING, " <span class="token operator">+
<span class="token string">"phone STRING, " <span class="token operator">+
<span class="token string">"sex INT, " <span class="token operator">+
<span class="token string">"name_hash STRING ) " <span class="token operator">+
<span class="token string">"WITH (" <span class="token operator">+
<span class="token string">"'connector' = 'print')"<span class="token punctuation">)<span class="token punctuation">;
resultTable<span class="token punctuation">.<span class="token function">executeInsert<span class="token punctuation">(<span class="token string">"output"<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<h5 class="md-end-block md-heading"><span class="md-plain">2.2.2 读取mysql数据使用UDF函数转换并输出到mysql</span></h5>
<pre class="language-java"><code class="language-java"><span class="token keyword">package <span class="token namespace">cn<span class="token punctuation">.ctyun<span class="token punctuation">.demo<span class="token punctuation">.flinksql<span class="token punctuation">;
<span class="token keyword">import <span class="token import"><span class="token namespace">cn<span class="token punctuation">.ctyun<span class="token punctuation">.demo<span class="token punctuation">.flinksql<span class="token punctuation">.udf<span class="token punctuation">.<span class="token class-name">HashScalarFunction<span class="token punctuation">;
<span class="token keyword">import <span class="token import"><span class="token namespace">org<span class="token punctuation">.apache<span class="token punctuation">.flink<span class="token punctuation">.streaming<span class="token punctuation">.api<span class="token punctuation">.environment<span class="token punctuation">.<span class="token class-name">StreamExecutionEnvironment<span class="token punctuation">;
<span class="token keyword">import <span class="token import"><span class="token namespace">org<span class="token punctuation">.apache<span class="token punctuation">.flink<span class="token punctuation">.table<span class="token punctuation">.api<span class="token punctuation">.bridge<span class="token punctuation">.java<span class="token punctuation">.<span class="token class-name">StreamTableEnvironment<span class="token punctuation">;
<span class="token doc-comment comment">/**
* @Date 2023/4/14 14:50
* @Description 读取mysql数据使用UDF函数转换并输出到mysql
*/
<span class="token keyword">public <span class="token keyword">class <span class="token class-name">FlinkSqlUdfMysql2Mysql <span class="token punctuation">{
<span class="token keyword">public <span class="token keyword">static <span class="token keyword">void <span class="token function">main<span class="token punctuation">(<span class="token class-name">String<span class="token punctuation">[<span class="token punctuation">] args<span class="token punctuation">) <span class="token punctuation">{
<span class="token class-name">StreamExecutionEnvironment env <span class="token operator">= <span class="token class-name">StreamExecutionEnvironment<span class="token punctuation">.<span class="token function">getExecutionEnvironment<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
env<span class="token punctuation">.<span class="token function">setParallelism<span class="token punctuation">(<span class="token number">1<span class="token punctuation">)<span class="token punctuation">;
<span class="token class-name">StreamTableEnvironment tableEnv <span class="token operator">= <span class="token class-name">StreamTableEnvironment<span class="token punctuation">.<span class="token function">create<span class="token punctuation">(env<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">// 1. 创建读取表,使用mysql进行
<span class="token class-name">String source_ddl <span class="token operator">= <span class="token string">"CREATE TABLE UserSource (" <span class="token operator">+
<span class="token string">" id INT, " <span class="token operator">+
<span class="token string">" name VARCHAR, " <span class="token operator">+
<span class="token string">" phone VARCHAR, " <span class="token operator">+
<span class="token string">" sex INT " <span class="token operator">+
<span class="token string">") WITH (" <span class="token operator">+
<span class="token string">" 'connector.type' = 'jdbc', " <span class="token operator">+
<span class="token string">" 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " <span class="token operator">+
<span class="token string">" 'connector.table' = 'test_user_table', " <span class="token operator">+
<span class="token string">" 'connector.username' = 'root', " <span class="token operator">+
<span class="token string">" 'connector.password' = '*******'" <span class="token operator">+
<span class="token string">")"<span class="token punctuation">;
tableEnv<span class="token punctuation">.<span class="token function">executeSql<span class="token punctuation">(source_ddl<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">//2. 创建写出表,使用mysql进行
<span class="token class-name">String sink_ddl <span class="token operator">= <span class="token string">"CREATE TABLE UserSink (" <span class="token operator">+
<span class="token string">"id INT, " <span class="token operator">+
<span class="token string">"name STRING, " <span class="token operator">+
<span class="token string">"phone STRING, " <span class="token operator">+
<span class="token string">"sex INT, " <span class="token operator">+
<span class="token string">"name_hash STRING " <span class="token operator">+
<span class="token string">") WITH (" <span class="token operator">+
<span class="token string">" 'connector.type' = 'jdbc', " <span class="token operator">+
<span class="token string">" 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false', " <span class="token operator">+
<span class="token string">" 'connector.table' = 'test_user_table_udf', " <span class="token operator">+
<span class="token string">" 'connector.username' = 'root', " <span class="token operator">+
<span class="token string">" 'connector.password' = '********'" <span class="token operator">+
<span class="token string">")"<span class="token punctuation">;
tableEnv<span class="token punctuation">.<span class="token function">executeSql<span class="token punctuation">(sink_ddl<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">// 3. 注册自定义标量函数
tableEnv<span class="token punctuation">.<span class="token function">createTemporarySystemFunction<span class="token punctuation">(<span class="token string">"MyHash"<span class="token punctuation">, <span class="token class-name">HashScalarFunction<span class="token punctuation">.<span class="token keyword">class<span class="token punctuation">)<span class="token punctuation">;
<span class="token comment">// 4. 使用insert语句进行数据输出,在这里进行UDF查询转换
<span class="token class-name">String insertSql <span class="token operator">= <span class="token string">"INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource"<span class="token punctuation">;
tableEnv<span class="token punctuation">.<span class="token function">executeSql<span class="token punctuation">(insertSql<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<h4 class="md-end-block md-heading md-focus"><span class="md-plain md-expand">2.3 在StreamPark中以SQL方式使用UDF函数</span></h4>
<p class="md-end-block md-p"><span class="md-plain">在StreamPark创建作业,导入作业依赖:</span></p>
<p class="md-end-block md-p"><span class="md-plain">flink-connector-jdbc_2.12-1.14.3.jar</span></p>
<p class="md-end-block md-p"><span class="md-plain">flink-demo-jar-job-1.0-SNAPSHOT.jar</span></p>
<p class="md-end-block md-p"><span class="md-plain">mysql-connector-java-8.0.21.jar</span></p>
<p class="md-end-block md-p"><span class="md-plain">FlinkSQL为:</span></p>
<pre class="language-java"><code class="language-java"><span class="token constant">CREATE <span class="token constant">FUNCTION <span class="token class-name">MyHash <span class="token constant">AS '<span class="token class-name"><span class="token namespace">cn<span class="token punctuation">.ctyun<span class="token punctuation">.demo<span class="token punctuation">.flinksql<span class="token punctuation">.udf<span class="token punctuation">.HashScalarFunction'<span class="token punctuation">;
<span class="token constant">CREATE <span class="token constant">TABLE <span class="token class-name">UserSource <span class="token punctuation">(
id <span class="token constant">INT<span class="token punctuation">,
name <span class="token constant">VARCHAR<span class="token punctuation">,
phone <span class="token constant">VARCHAR<span class="token punctuation">,
sex <span class="token constant">INT
<span class="token punctuation">) <span class="token constant">WITH <span class="token punctuation">(
'connector<span class="token punctuation">.type<span class="token char">' = 'jdbc'<span class="token punctuation">,
'connector<span class="token punctuation">.url<span class="token char">' = 'jdbc<span class="token operator">:mysql<span class="token operator">:<span class="token operator">/<span class="token doc-comment comment">/********:3306/flink_test_source?useSSL=false',
'connector.table' = 'test_user_table',
'connector.username' = 'root',
'connector.password' = '*********'
);
CREATE TABLE UserSink (
id INT,
name STRING,
phone STRING,
sex INT,
name_hash STRING
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false',
'connector.table' = 'test_user_table_udf',
'connector.username' = 'root',
'connector.password' = '**********'
);
INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource;</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<p>运行作业后mysql可正常插入数据</p><br><br>
来源:https://www.cnblogs.com/developer-tianyiyun/p/19079409
頁:
[1]