Flink常见维表Join方案,收藏学习开发很有用!
<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li>前言</li><li>
查找关联<ul class="second_class_ul"><li>
同步</li><li>
异步</li></ul></li><li>
状态编程,预加载数据到状态中,按需取<ul class="second_class_ul"></ul></li><li>
冷热数据<ul class="second_class_ul"></ul></li><li>
广播维表<ul class="second_class_ul"></ul></li><li>
Temporal Table Join(FlinkSQL与Flink Table API)<ul class="second_class_ul"><li>
Event Time Temporal Join</li><li>
Processing Time Temporal Join</li></ul></li><li>
Lookup Table Join<ul class="second_class_ul"></ul></li><li>
总结<ul class="second_class_ul"></ul></li></ul></div><p>
<img title="Flink常见维表Join方案,收藏学习开发很有用!" alt="Flink常见维表Join方案,收藏学习开发很有用!" border="0" height="auto" src="https://zhuji.jb51.net/uploads/img/202305/826ba189d2f4162b848d0e150f7153cd.jpg" width="auto"></p>
<p class="maodian"></p><h2>
前言</h2>
<p>
实时数仓,难免会遇到join维表的业务。现总结几种方案,供各位看官选择:</p>
<ul>
<li>
查找关联(同步,异步)</li>
<li>
状态编程,预加载数据到状态中,按需取</li>
<li>
冷热数据</li>
<li>
广播维表</li>
<li>
Temporal Table Join</li>
<li>
Lookup Table Join</li>
</ul>
<p>
其中中间留下两个问题,供大家思考,可留言一起讨论?</p>
<p class="maodian"></p><h2>
查找关联</h2>
<p>
查找关联就是在主流数据中直接访问外部数据(mysql,redis,impala ...)去根据主键或者某种关键条件去关联取值。</p>
<p>
适合: 维表数据量大,但是主数据不大的业务实时计算。</p>
<p>
缺点:数据量大的时候,会给外部数据源库带来很大的压力,因为某条数据都需要关联。</p>
<p class="maodian"></p><h3>
同步</h3>
<p>
访问数据库是同步调用,导致 subtak 线程会被阻塞,影响吞吐量</p>
<p>
<img title="Flink常见维表Join方案,收藏学习开发很有用!" alt="Flink常见维表Join方案,收藏学习开发很有用!" border="0" height="auto" src="https://zhuji.jb51.net/uploads/img/202305/8ec04955b893e1c7d204e3169d870811.jpg" width="auto"></p>
<ol class="dp-sql">
<li class="alt">
<span><span>import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} </span></span>
</li>
<li>
<span>import com.wang.stream.env.{FlinkStreamEnv, KafkaSourceEnv} </span>
</li>
<li class="alt">
<span>import org.apache.flink.api.common.functions.FlatMapFunction </span>
</li>
<li>
<span>import org.apache.flink.api.common.serialization.SimpleStringSchema </span>
</li>
<li class="alt">
<span>import org.apache.flink.streaming.api.scala._ </span>
</li>
<li>
<span>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer </span>
</li>
<li class="alt">
<span>import org.apache.flink.util.Collector </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> def analyses(): Unit ={ </span>
</li>
<li>
<span> val env: StreamExecutionEnvironment = FlinkStreamEnv.get() </span>
</li>
<li class="alt">
<span> KafkaSourceEnv.getKafkaSourceStream(env,List(<span class="string">"test"</span><span>)) </span></span>
</li>
<li>
<span> .map(JSON.parseObject(_)) </span>
</li>
<li class="alt">
<span> .filter(_!=<span class="op">null</span><span>) </span></span>
</li>
<li>
<span> .flatMap( </span>
</li>
<li class="alt">
<span> new FlatMapFunction { </span>
</li>
<li>
<span> override def flatMap(jSONObject: JSONObject, collector: Collector): Unit = { </span>
</li>
<li class="alt">
<span> // 如果topic就一张表,不用区分,如果多张表,可以通过<span class="keyword">database</span><span> 与 </span><span class="keyword">table</span><span> 区分,放到下一步去处理 </span></span>
</li>
<li>
<span> // 表的名字 </span>
</li>
<li class="alt">
<span> val databaseName:String = jSONObject.getString(<span class="string">"database"</span><span>) </span></span>
</li>
<li>
<span> // 表的名字 </span>
</li>
<li class="alt">
<span> val tableName:String = jSONObject.getString(<span class="string">"table"</span><span>) </span></span>
</li>
<li>
<span> // 数据操作类型 <span class="keyword">INSERT</span><span> </span><span class="keyword">UPDATE</span><span> </span><span class="keyword">DELETE</span><span> </span></span>
</li>
<li class="alt">
<span> val operationType:String = jSONObject.getString(<span class="string">"type"</span><span>) </span></span>
</li>
<li>
<span> // 主体数据 </span>
</li>
<li class="alt">
<span> val tableData: JSONArray = jSONObject.getJSONArray(<span class="string">"data"</span><span>) </span></span>
</li>
<li>
<span> // old 值 </span>
</li>
<li class="alt">
<span> val old: JSONArray = jSONObject.getJSONArray(<span class="string">"old"</span><span>) </span></span>
</li>
<li>
<span> // canal json 可能存在批处理出现data数据多条 </span>
</li>
<li class="alt">
<span> <span class="keyword">for</span><span> (i <- 0 until tableData.</span><span class="keyword">size</span><span>()) { </span></span>
</li>
<li>
<span> val data: String = tableData.get(i).toString </span>
</li>
<li class="alt">
<span> val nObject: JSONObject = JSON.parseObject(data) </span>
</li>
<li>
<span> val orderId: AnyRef = nObject.get(<span class="string">"order_id"</span><span>) </span></span>
</li>
<li class="alt">
<span> // 下面写(mysql,redis或者hbase)的连接,利用api 通过orderId查找 </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> // 最后封装数据格式 就是<span class="op">join</span><span>所得 </span></span>
</li>
<li>
<span> collector.collect(<span class="op">null</span><span>) </span></span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> ) </span>
</li>
<li class="alt">
<span> .addSink( </span>
</li>
<li>
<span> new FlinkKafkaProducer( </span>
</li>
<li class="alt">
<span> <span class="string">""</span><span>, </span></span>
</li>
<li>
<span> <span class="string">""</span><span>, </span></span>
</li>
<li class="alt">
<span> new SimpleStringSchema() </span>
</li>
<li>
<span> ) </span>
</li>
<li class="alt">
<span> ) </span>
</li>
<li>
<span> env.<span class="keyword">execute</span><span>(</span><span class="string">""</span><span>) </span></span>
</li>
</ol>
<p class="maodian"></p><h3>
异步</h3>
<p>
AsyncIO 可以并发地处理多个请求,很大程度上减少了对 subtask 线程的阻塞。</p>
<p>
<img title="Flink常见维表Join方案,收藏学习开发很有用!" alt="Flink常见维表Join方案,收藏学习开发很有用!" border="0" height="auto" src="https://zhuji.jb51.net/uploads/img/202305/b939a1b8006dd7023c56e6c66cfdf929.jpg" width="auto"></p>
<ol class="dp-sql">
<li class="alt">
<span><span>def analyses(): Unit ={ </span></span>
</li>
<li>
<span> val env: StreamExecutionEnvironment = FlinkStreamEnv.get() </span>
</li>
<li class="alt">
<span> val source: DataStream = KafkaSourceEnv.getKafkaSourceStream(env, List(<span class="string">"test"</span><span>)) </span></span>
</li>
<li>
<span> .map(JSON.parseObject(_)) </span>
</li>
<li class="alt">
<span> .filter(_ != <span class="op">null</span><span>) </span></span>
</li>
<li>
<span> .flatMap( </span>
</li>
<li class="alt">
<span> new FlatMapFunction { </span>
</li>
<li>
<span> override def flatMap(jSONObject: JSONObject, collector: Collector): Unit = { </span>
</li>
<li class="alt">
<span> // 如果topic就一张表,不用区分,如果多张表,可以通过<span class="keyword">database</span><span> 与 </span><span class="keyword">table</span><span> 区分,放到下一步去处理 </span></span>
</li>
<li>
<span> // 表的名字 </span>
</li>
<li class="alt">
<span> val databaseName: String = jSONObject.getString(<span class="string">"database"</span><span>) </span></span>
</li>
<li>
<span> // 表的名字 </span>
</li>
<li class="alt">
<span> val tableName: String = jSONObject.getString(<span class="string">"table"</span><span>) </span></span>
</li>
<li>
<span> // 数据操作类型 <span class="keyword">INSERT</span><span> </span><span class="keyword">UPDATE</span><span> </span><span class="keyword">DELETE</span><span> </span></span>
</li>
<li class="alt">
<span> val operationType: String = jSONObject.getString(<span class="string">"type"</span><span>) </span></span>
</li>
<li>
<span> // 主体数据 </span>
</li>
<li class="alt">
<span> val tableData: JSONArray = jSONObject.getJSONArray(<span class="string">"data"</span><span>) </span></span>
</li>
<li>
<span> // old 值 </span>
</li>
<li class="alt">
<span> val old: JSONArray = jSONObject.getJSONArray(<span class="string">"old"</span><span>) </span></span>
</li>
<li>
<span> // canal json 可能存在批处理出现data数据多条 </span>
</li>
<li class="alt">
<span> <span class="keyword">for</span><span> (i <- 0 until tableData.</span><span class="keyword">size</span><span>()) { </span></span>
</li>
<li>
<span> val data: String = tableData.get(i).toString </span>
</li>
<li class="alt">
<span> collector.collect(data) </span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> ) </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> AsyncDataStream.unorderedWait( </span>
</li>
<li>
<span> source, </span>
</li>
<li class="alt">
<span> new RichAsyncFunction {//自定义的数据源异步处理类 </span>
</li>
<li>
<span> override def <span class="keyword">open</span><span>(parameters: Configuration): Unit = { </span></span>
</li>
<li class="alt">
<span> // 初始化 </span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> override def asyncInvoke(input: String, resultFuture: ResultFuture): Unit = { </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> // 将数据搜集 </span>
</li>
<li>
<span> resultFuture.complete(<span class="op">null</span><span>) </span></span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> override def <span class="keyword">close</span><span>(): Unit = { </span></span>
</li>
<li>
<span> // 关闭 </span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> }, </span>
</li>
<li class="alt">
<span> 1000,//异步超时时间 </span>
</li>
<li>
<span> TimeUnit.MILLISECONDS,//时间单位 </span>
</li>
<li class="alt">
<span> 100)//最大异步并发请求数量 </span>
</li>
<li>
<span> .addSink( </span>
</li>
<li class="alt">
<span> new FlinkKafkaProducer( </span>
</li>
<li>
<span> <span class="string">""</span><span>, </span></span>
</li>
<li class="alt">
<span> <span class="string">""</span><span>, </span></span>
</li>
<li>
<span> new SimpleStringSchema() </span>
</li>
<li class="alt">
<span> ) </span>
</li>
<li>
<span> ) </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> env.<span class="keyword">execute</span><span>(</span><span class="string">""</span><span>) </span></span>
</li>
<li class="alt">
<span> } </span>
</li>
</ol>
<p class="maodian"></p><h2>
状态编程,预加载数据到状态中,按需取</h2>
<p>
首先把维表数据初始化到state中,设置好更新时间,定时去把维表。</p>
<p>
<img title="Flink常见维表Join方案,收藏学习开发很有用!" alt="Flink常见维表Join方案,收藏学习开发很有用!" border="0" height="auto" src="https://zhuji.jb51.net/uploads/img/202305/02e6a02009db0e1d72cf1b0da5aaa8d7.jpg" width="auto"></p>
<p>
优点:flink 自己维护状态数据,"荣辱与共",不需要频繁链接外部数据源,达到解耦。</p>
<p>
缺点:不适合大的维表和变化大的维表。</p>
<ol class="dp-sql">
<li class="alt">
<span><span>.keyBy(_._1) </span></span>
</li>
<li>
<span>.process( </span>
</li>
<li class="alt">
<span> new KeyedProcessFunction{ </span>
</li>
<li>
<span> private var mapState:MapState] = _ </span>
</li>
<li class="alt">
<span> private var <span class="keyword">first</span><span>: Boolean = </span><span class="keyword">true</span><span> </span></span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> override def <span class="keyword">open</span><span>(parameters: Configuration): Unit = { </span></span>
</li>
<li>
<span> val config: StateTtlConfig = StateTtlConfig </span>
</li>
<li class="alt">
<span> .newBuilder(org.apache.flink.api.common.<span class="keyword">time</span><span>.</span><span class="keyword">Time</span><span>.minutes(5)) </span></span>
</li>
<li>
<span> .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) </span>
</li>
<li class="alt">
<span> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) </span>
</li>
<li>
<span> .build() </span>
</li>
<li class="alt">
<span> val <span class="op">join</span><span> = new MapStateDescriptor](</span><span class="string">"join"</span><span>,classOf,classOf]) </span></span>
</li>
<li>
<span> <span class="op">join</span><span>.enableTimeToLive(config) </span></span>
</li>
<li class="alt">
<span> mapState = getRuntimeContext.getMapState(<span class="op">join</span><span>) </span></span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> override def processElement( </span>
</li>
<li>
<span> <span class="op">in</span><span>: (String, String, String, String, String), </span></span>
</li>
<li class="alt">
<span> context: KeyedProcessFunction#Context, </span>
</li>
<li>
<span> collector: Collector): Unit = { </span>
</li>
<li class="alt">
<span> // 加载维表 </span>
</li>
<li>
<span> if(<span class="keyword">first</span><span>){ </span></span>
</li>
<li class="alt">
<span> <span class="keyword">first</span><span> = </span><span class="keyword">false</span><span> </span></span>
</li>
<li>
<span> val <span class="keyword">time</span><span>: Long = System.currentTimeMillis() </span></span>
</li>
<li class="alt">
<span> getSmallDimTableInfo() </span>
</li>
<li>
<span> // 设置好更新时间,定时去把维表 </span>
</li>
<li class="alt">
<span> context.timerService().registerProcessingTimeTimer(<span class="keyword">time</span><span> + 86400000) </span></span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> // 数据处理,过来一条条数据,然后按照自己的业务逻辑去取维表的数据即可 </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> // 然后封装 放到collect中 </span>
</li>
<li class="alt">
<span> collector.collect(<span class="op">null</span><span>) </span></span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> override def onTimer( </span>
</li>
<li class="alt">
<span> <span class="keyword">timestamp</span><span>: Long, </span></span>
</li>
<li>
<span> ctx: KeyedProcessFunction#OnTimerContext, </span>
</li>
<li class="alt">
<span> <span class="keyword">out</span><span>: Collector): Unit = { </span></span>
</li>
<li>
<span> println(<span class="string">"触发器执行"</span><span>) </span></span>
</li>
<li class="alt">
<span> mapState.clear() </span>
</li>
<li>
<span> getSmallDimTableInfo() </span>
</li>
<li class="alt">
<span> println(mapState) </span>
</li>
<li>
<span> ctx.timerService().registerProcessingTimeTimer(<span class="keyword">timestamp</span><span> + 86400000) </span></span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> def getSmallDimTableInfo(): Unit ={ </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> // 加载 字典数据 </span>
</li>
<li class="alt">
<span> val select_dictionary=<span class="string">"select dic_code,pre_dictionary_id,dic_name from xxxx"</span><span> </span></span>
</li>
<li>
<span> val dictionary: util.List] = MysqlUtil.executeQuery(select_dictionary, <span class="op">null</span><span>) </span></span>
</li>
<li class="alt">
<span> dictionary.foreach(item=>{ </span>
</li>
<li>
<span> mapState.put(<span class="string">"dic_dictionary_"</span><span>+item.get(</span><span class="string">"pre_dictionary_id"</span><span>).toString,item) </span></span>
</li>
<li class="alt">
<span> }) </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span>) </span>
</li>
<li>
<span>.filter(_!=<span class="op">null</span><span>) </span></span>
</li>
<li class="alt">
<span>.addSink( </span>
</li>
<li>
<span> new FlinkKafkaProducer( </span>
</li>
<li class="alt">
<span> <span class="string">""</span><span>, </span></span>
</li>
<li>
<span> <span class="string">""</span><span>, </span></span>
</li>
<li class="alt">
<span> new SimpleStringSchema() </span>
</li>
<li>
<span> ) </span>
</li>
<li class="alt">
<span>) </span>
</li>
<li>
<span>v.<span class="keyword">execute</span><span>(</span><span class="string">""</span><span>) </span></span>
</li>
</ol>
<p>
思考下:直接定义一个Map集合这样的优缺点是什么?可以留言说出自己的看法?</p>
<p class="maodian"></p><h2>
冷热数据</h2>
<p>
思想:先去状态去取,如果没有,去外部查询,同时去存到状态里面。StateTtlConfig 的过期时间可以设置短点。</p>
<p>
<img title="Flink常见维表Join方案,收藏学习开发很有用!" alt="Flink常见维表Join方案,收藏学习开发很有用!" border="0" height="auto" src="https://zhuji.jb51.net/uploads/img/202305/c25e49c117236a7394b600e7068e6d68.jpg" width="auto"></p>
<p>
优点:中庸取值方案,热备常用数据到内存,也避免了数据join相对过多外部数据源。</p>
<p>
缺点:也不能一劳永逸解决某些问题,热备数据过多,或者冷数据过大,都会对state 或者 外部数据库造成压力。</p>
<ol class="dp-sql">
<li class="alt">
<span><span>.filter(_._1 != </span><span class="op">null</span><span>) </span></span>
</li>
<li>
<span>.keyBy(_._1) </span>
</li>
<li class="alt">
<span>.process( </span>
</li>
<li>
<span> new KeyedProcessFunction{ </span>
</li>
<li class="alt">
<span> private var mapState:MapState] = _ </span>
</li>
<li>
<span> private var <span class="keyword">first</span><span>: Boolean = </span><span class="keyword">true</span><span> </span></span>
</li>
<li class="alt">
<span> override def <span class="keyword">open</span><span>(parameters: Configuration): Unit = { </span></span>
</li>
<li>
<span> val config: StateTtlConfig = StateTtlConfig </span>
</li>
<li class="alt">
<span> .newBuilder(org.apache.flink.api.common.<span class="keyword">time</span><span>.</span><span class="keyword">Time</span><span>.days(1)) </span></span>
</li>
<li>
<span> .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) </span>
</li>
<li class="alt">
<span> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) </span>
</li>
<li>
<span> .build() </span>
</li>
<li class="alt">
<span> val <span class="op">join</span><span> = new MapStateDescriptor](</span><span class="string">"join"</span><span>,classOf,classOf]) </span></span>
</li>
<li>
<span> <span class="op">join</span><span>.enableTimeToLive(config) </span></span>
</li>
<li class="alt">
<span> mapState = getRuntimeContext.getMapState(<span class="op">join</span><span>) </span></span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> override def processElement( </span>
</li>
<li>
<span> <span class="op">in</span><span>: (String, String, String, String, String), </span></span>
</li>
<li class="alt">
<span> context: KeyedProcessFunction#Context, </span>
</li>
<li>
<span> collector: Collector): Unit = { </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> // 数据处理,过来一条条数据,然后按照自己的业务逻辑先去mapState去找,如果没有再去 外部去找 </span>
</li>
<li class="alt">
<span> if (mapState.<span class="keyword">contains</span><span>(</span><span class="string">"xx_id"</span><span>)){ </span></span>
</li>
<li>
<span> // 如果存在就取 </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> }<span class="keyword">else</span><span>{ </span></span>
</li>
<li class="alt">
<span> // 如果不存在去外部拿,然后放到mapState中 </span>
</li>
<li>
<span> val dim_sql=<span class="string">"select dic_code,pre_dictionary_id,dic_name from xxxx where id=xx_id"</span><span> </span></span>
</li>
<li class="alt">
<span> val dim: util.List] = MysqlUtil.executeQuery(dim_sql, <span class="op">null</span><span>) </span></span>
</li>
<li>
<span> mapState.put(<span class="string">"xx_id"</span><span>,</span><span class="op">null</span><span>) </span></span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span> // 然后封装 放到collect中 </span>
</li>
<li class="alt">
<span> collector.collect(<span class="op">null</span><span>) </span></span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span> } </span>
</li>
<li>
<span>) </span>
</li>
</ol>
<p class="maodian"></p><h2>
广播维表</h2>
<p>
比如上面提到的字典表,每一个Task都需要这份数据,那么需要join这份数据的时候就可以使用广播维表。</p>
<p>
<img title="Flink常见维表Join方案,收藏学习开发很有用!" alt="Flink常见维表Join方案,收藏学习开发很有用!" border="0" height="auto" src="https://zhuji.jb51.net/uploads/img/202305/53b5775e468c56aec203f45027a4d94d.jpg" width="auto"></p>
<ol class="dp-sql">
<li class="alt">
<span><span>val dimStream=env.addSource(MysqlSource) </span></span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span>// 广播状态 </span>
</li>
<li>
<span>val broadcastStateDesc=new MapStateDescriptor(<span class="string">"broadcaststate"</span><span>, BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(Long.class, Dim.class)) </span></span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span>// 广播流 </span>
</li>
<li class="alt">
<span>val broadStream=dimStream.broadcast() </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span>// 主数据流 </span>
</li>
<li>
<span>val mainConsumer = new FlinkKafkaConsumer(<span class="string">"topic"</span><span>, new SimpleStringSchema(), kafkaConfig) </span></span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span>val mainStream=env.addSource(mainConsumer) </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span>// 广播状态与维度表关联 </span>
</li>
<li class="alt">
<span>val connectedStream=mainStream.<span class="keyword">connect</span><span>(broadStream).map(..</span><span class="func">User</span><span>(id,</span><span class="keyword">name</span><span>)).</span><span class="keyword">key</span><span>(_.1) </span></span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span>connectedStream.process(new KeyedBroadcastProcessFunction,String] { </span></span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> override def processElement(value: <span class="func">User</span><span>, ctx: KeyedBroadcastProcessFunction,String]#ReadOnlyContext, </span><span class="keyword">out</span><span>: Collector): Unit = { </span></span>
</li>
<li>
<span> // 取到数据就可以愉快的玩耍了 </span>
</li>
<li class="alt">
<span> val state=ctx.getBroadcastState(broadcastStateDesc) </span>
</li>
<li>
<span> xxxxxx </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> } </span>
</li>
<li class="alt">
<span>} </span>
</li>
</ol>
<p>
「思考:」 如果把维表流也通过实时监控binlog到kafka,当维度数据发生变化时,更新放到状态中,这种方式,是不是更具有时效性呢?</p>
<p>
(1)通过canal把变更binlog方式发送到kafka中。</p>
<p>
(2)数据流定义成为广播流,广播到数据到主数据流中。</p>
<p>
(3)定义一个广播状态存储数据,在主数据进行查找匹配,符合要求则join成功。</p>
<p class="maodian"></p><h2>
Temporal Table Join(FlinkSQL与Flink Table API)</h2>
<p>
由于维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?如果用传统的 JOIN 语法来表达维表 JOIN,是不完整的。因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢?我们是不知道的,结果是不确定的。所以 Flink SQL 的维表 JOIN 语法引入了Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照。</p>
<p>
普通关联会一直保留关联双侧的数据,数据也就会一直膨胀,直到撑爆内存导致任务失败,Temporal Join则可以定期清理过期数据,在合理的内存配置下即可避免内存溢出。</p>
<p>
<img title="Flink常见维表Join方案,收藏学习开发很有用!" alt="Flink常见维表Join方案,收藏学习开发很有用!" border="0" height="auto" src="https://zhuji.jb51.net/uploads/img/202305/52a5fc6afd729e4ec34b62db36da4949.jpg" width="auto"></p>
<p class="maodian"></p><h3>
Event Time Temporal Join</h3>
<p>
<strong>语法</strong></p>
<ol class="dp-sql">
<li class="alt">
<span><span class="keyword">SELECT</span><span> </span></span>
</li>
<li>
<span><span class="keyword">FROM</span><span> table1 [</span><span class="keyword">AS</span><span> <alias1>] </span></span>
</li>
<li class="alt">
<span>[<span class="func">LEFT</span><span>] </span><span class="op">JOIN</span><span> table2 </span><span class="keyword">FOR</span><span> SYSTEM_TIME </span><span class="keyword">AS</span><span> </span><span class="keyword">OF</span><span> table1.{ proctime | rowtime } [</span><span class="keyword">AS</span><span> <alias2>] </span></span>
</li>
<li>
<span><span class="keyword">ON</span><span> table1.</span><span class="keyword">column</span><span>-name1 = table2.</span><span class="keyword">column</span><span>-name1 </span></span>
</li>
</ol>
<p>
使用事件时间属性(即行时间属性),可以检索过去某个时间点的键值。这允许在一个共同的时间点连接两个表。</p>
<p>
<strong>举例</strong></p>
<p>
假设我们有一个订单表,每个订单都有不同货币的价格。为了将此表正确地规范化为单一货币,每个订单都需要与下订单时的适当货币兑换率相结合。</p>
<ol class="dp-sql">
<li class="alt">
<span><span class="keyword">CREATE</span><span> </span><span class="keyword">TABLE</span><span> orders ( </span></span>
</li>
<li>
<span> order_id STRING, </span>
</li>
<li class="alt">
<span> price <span class="keyword">DECIMAL</span><span>(32,2), </span></span>
</li>
<li>
<span> currency STRING, </span>
</li>
<li class="alt">
<span> order_time <span class="keyword">TIMESTAMP</span><span>(3), </span></span>
</li>
<li>
<span> WATERMARK <span class="keyword">FOR</span><span> order_time </span><span class="keyword">AS</span><span> order_time </span></span>
</li>
<li class="alt">
<span>) <span class="keyword">WITH</span><span> (/* ... */); </span></span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span><span class="keyword">CREATE</span><span> </span><span class="keyword">TABLE</span><span> currency_rates ( </span></span>
</li>
<li class="alt">
<span> currency STRING, </span>
</li>
<li>
<span> conversion_rate <span class="keyword">DECIMAL</span><span>(32, 2), </span></span>
</li>
<li class="alt">
<span> update_time <span class="keyword">TIMESTAMP</span><span>(3) METADATA </span><span class="keyword">FROM</span><span> `</span><span class="keyword">values</span><span>.source.</span><span class="keyword">timestamp</span><span>` VIRTUAL </span></span>
</li>
<li>
<span> WATERMARK <span class="keyword">FOR</span><span> update_time </span><span class="keyword">AS</span><span> update_time, </span></span>
</li>
<li class="alt">
<span> <span class="keyword">PRIMARY</span><span> </span><span class="keyword">KEY</span><span>(currency) </span><span class="op">NOT</span><span> ENFORCED </span></span>
</li>
<li>
<span>) <span class="keyword">WITH</span><span> ( </span></span>
</li>
<li class="alt">
<span> <span class="string">'connector'</span><span> = </span><span class="string">'upsert-kafka'</span><span>, </span></span>
</li>
<li>
<span> /* ... */ </span>
</li>
<li class="alt">
<span>); </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span>event-<span class="keyword">time</span><span> temporal </span><span class="op">join</span><span>需要temporal </span><span class="op">join</span><span>条件的等价条件中包含的主键 </span></span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span><span class="keyword">SELECT</span><span> </span></span>
</li>
<li>
<span> order_id, </span>
</li>
<li class="alt">
<span> price, </span>
</li>
<li>
<span> currency, </span>
</li>
<li class="alt">
<span> conversion_rate, </span>
</li>
<li>
<span> order_time, </span>
</li>
<li class="alt">
<span><span class="keyword">FROM</span><span> orders </span></span>
</li>
<li>
<span><span class="func">LEFT</span><span> </span><span class="op">JOIN</span><span> currency_rates </span><span class="keyword">FOR</span><span> SYSTEM </span><span class="keyword">TIME</span><span> </span><span class="keyword">AS</span><span> </span><span class="keyword">OF</span><span> orders.order_time </span></span>
</li>
<li class="alt">
<span><span class="keyword">ON</span><span> orders.currency = currency_rates.currency </span></span>
</li>
</ol>
<p class="maodian"></p><h3>
Processing Time Temporal Join</h3>
<p>
处理时间时态表连接使用处理时间属性将行与外部版本表中键的最新版本相关联。</p>
<p>
根据定义,使用processing-time属性,连接将始终返回给定键的最新值。可以将查找表看作是一个简单的HashMap,它存储来自构建端的所有记录。这种连接的强大之处在于,当在Flink中无法将表具体化为动态表时,它允许Flink直接针对外部系统工作。</p>
<p>
使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。</p>
<p class="maodian"></p><h2>
Lookup Table Join</h2>
<p>
Lookup Join 通常用于通过连接外部表(维度表)补充信息,要求一个表具有处理时间属性,另一个表使 Lookup Source Connector。</p>
<p>
JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表)。用到的语法是 Temporal Joins 的语法。</p>
<ol class="dp-sql">
<li class="alt">
<span><span>s</span><span class="string">""</span><span>" </span></span>
</li>
<li>
<span> |<span class="keyword">CREATE</span><span> </span><span class="keyword">TABLE</span><span> users( </span></span>
</li>
<li class="alt">
<span> |id <span class="keyword">int</span><span>, </span></span>
</li>
<li>
<span> |<span class="keyword">name</span><span> string, </span></span>
</li>
<li class="alt">
<span> |<span class="keyword">PRIMARY</span><span> </span><span class="keyword">KEY</span><span> (id) </span><span class="op">NOT</span><span> ENFORCED </span></span>
</li>
<li>
<span> |) </span>
</li>
<li class="alt">
<span> |<span class="keyword">WITH</span><span> ( </span></span>
</li>
<li>
<span> |<span class="string">'connector'</span><span> = </span><span class="string">'jdbc'</span><span>, </span></span>
</li>
<li class="alt">
<span> |<span class="string">'url'</span><span> = </span><span class="string">'xxxx'</span><span>, </span></span>
</li>
<li>
<span> |<span class="string">'driver'</span><span>=</span><span class="string">'$DRIVER_CLASS_NAME'</span><span>, </span></span>
</li>
<li class="alt">
<span> |<span class="string">'table-name'</span><span>=</span><span class="string">'$tableName'</span><span>, </span></span>
</li>
<li>
<span> |<span class="string">'lookup.cache.max-rows'</span><span>=</span><span class="string">'100'</span><span>, </span></span>
</li>
<li class="alt">
<span> |<span class="string">'lookup.cache.ttl'</span><span>=</span><span class="string">'30s'</span><span> </span></span>
</li>
<li>
<span> |) </span>
</li>
<li class="alt">
<span> |<span class="string">""</span><span>".stripMargin </span></span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span>s<span class="string">""</span><span>" </span></span>
</li>
<li class="alt">
<span> |<span class="keyword">CREATE</span><span> </span><span class="keyword">TABLE</span><span> car( </span></span>
</li>
<li>
<span> |`id` <span class="keyword">bigint</span><span> , </span></span>
</li>
<li class="alt">
<span> |`user_id` <span class="keyword">bigint</span><span>, </span></span>
</li>
<li>
<span> |`proctime` <span class="keyword">as</span><span> PROCTIME() </span></span>
</li>
<li class="alt">
<span> |) </span>
</li>
<li>
<span> |<span class="keyword">WITH</span><span> ( </span></span>
</li>
<li class="alt">
<span> | <span class="string">'connector'</span><span> = </span><span class="string">'kafka'</span><span>, </span></span>
</li>
<li>
<span> | <span class="string">'topic'</span><span> = </span><span class="string">'$topic'</span><span>, </span></span>
</li>
<li class="alt">
<span> | <span class="string">'scan.startup.mode'</span><span> = </span><span class="string">'latest-offset'</span><span>, </span></span>
</li>
<li>
<span> | <span class="string">'properties.bootstrap.servers'</span><span> = </span><span class="string">'$KAFKA_SERVICE'</span><span>, </span></span>
</li>
<li class="alt">
<span> | <span class="string">'properties.group.id'</span><span> = </span><span class="string">'indicator'</span><span>, </span></span>
</li>
<li>
<span> | <span class="string">'format'</span><span> = </span><span class="string">'canal-json'</span><span> </span></span>
</li>
<li class="alt">
<span> |) </span>
</li>
<li>
<span> |<span class="string">""</span><span>".stripMargin </span></span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> </span>
</li>
<li class="alt">
<span> </span>
</li>
<li>
<span> <span class="keyword">SELECT</span><span> </span></span>
</li>
<li class="alt">
<span> mc.user_id user_id, </span>
</li>
<li>
<span> <span class="func">count</span><span>(1) </span><span class="keyword">AS</span><span> `value` </span></span>
</li>
<li class="alt">
<span> <span class="keyword">FROM</span><span> car mc </span></span>
</li>
<li>
<span> <span class="keyword">inner</span><span> </span><span class="op">join</span><span> users </span><span class="keyword">FOR</span><span> SYSTEM_TIME </span><span class="keyword">AS</span><span> </span><span class="keyword">OF</span><span> mc.proctime </span><span class="keyword">as</span><span> u </span><span class="keyword">on</span><span> mc.user_id=s.id </span></span>
</li>
<li class="alt">
<span> <span class="keyword">group</span><span> </span><span class="keyword">by</span><span> mc.user_id </span></span>
</li>
</ol>
<p class="maodian"></p><h2>
总结</h2>
<p>
总体来讲,关联维表有四个基础的方式:</p>
<p>
(1)查找外部数据源关联</p>
<p>
(2)预加载维表关联(内存,状态)</p>
<p>
(3)冷热数据储备(算是1和2的结合使用)</p>
<p>
(4)维表变更日志关联(广播也好,其他方式的流关联也好)</p>
<p>
「同时考虑:」 吞吐量,时效性,外部数据源的负载,内存资源,解耦性等等方面。</p>
<p>
四种join方式不存在绝对的一劳永逸,更多的是针对业务场景在各指标上的权衡取舍,因此看官需要结合场景来选择适合的。</p>
<p>
原文链接:https://mp.weixin.qq.com/s/XkQW8Kjvs5FMbMlIn58YFw</p>
頁:
[1]