吕建华 發表於 2025-5-9 10:12:00

HTTP接口数据也能定时同步入湖?用DolphinScheduler×SeaTunnel快速搞定!

<h2 id="背景与目标">背景与目标</h2>
<p>我们之前曾评估使用过SeaTunnel做CDC入湖验证:SeaTunnel-CDC入湖实践,这些场景都是能直连数据库的场景,业务需求中经常会出现无法直连数据库做CDC进行数据同步的场景,而这些场景就需要使用API进行数据对接,用Apache DolphinScheduler定时同步数据。</p>
<p>举个实际中的例子:</p>
<ul>
<li>ERP(SAP)的库存数据进行同步入湖仓做库存分析</li>
</ul>
<p>同时,本次目标希望其他同事能依样画葫芦,在以后的对接http接口到湖仓的时候能够自行完成,而非每遇到一个对接需求,就需要通过代码方式进行对接。</p>
<h2 id="准备工作">准备工作</h2>
<ul>
<li>seatunnel 2.3.10</li>
</ul>
<p>首先,您需要在<code>${SEATUNNEL_HOME}/config/plugin_config</code>文件中加入连接器名称,然后,执行命令来安装连接器,确认连接器在${SEATUNNEL_HOME}/connectors/目录下即可。</p>
<p>本例中我们会用到:<code>connector-jdbc</code>、<code>connector-paimon</code></p>
<p>写入StarRocks也可以使用<code>connector-starrocks</code>,本例中的场景比较适合用<code>connector-jdbc</code>,所以使用<code>connector-jdbc</code>。</p>
<pre><code># 配置连接器名称
--connectors-v2--
connector-jdbc
connector-starrocks
connector-paimon
--end--
</code></pre>
<pre><code># 安装连接器
sh bin/install-plugin.sh 2.3.10
</code></pre>
<h2 id="seatunnel任务">SeaTunnel任务</h2>
<p>我们先至少保证能在本地完成SeaTunnel任务,再完成对Apache DolphinScheduler的对接。</p>
<ul>
<li>http to starRocks<br>
<code>example/http2starrocks</code></li>
</ul>
<pre><code>env {
parallelism = 1
job.mode = "BATCH"
}

source {
Http {
    plugin_output = "stock"
    url = "https://ip/http/prd/query_sap_stock"
    method = "POST"
    headers {
      Authorization = "Basic XXX"
      Content-Type = "application/json"
    }
    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
    format = "json"
    content_field = "$.ET_RETURN.*"
    schema {
      fields {
      MATNR = "string"
      MAKTX = "string"
      WERKS = "string"
      NAME1 = "string"
      LGORT = "string"
      LGOBE = "string"
      CHARG = "string"
      MEINS = "string"
      LABST = "double"
      UMLME = "double"
      INSME = "double"
      EINME = "double"
      SPEME = "double"
      RETME = "double"
      }
    }
}
}

# 此转换操作主要用于字段从命名等方便用途
transform {
Sql {
    plugin_input = "stock"
    plugin_output = "stock-tf-out"
    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
}
}

# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写
sink {
    jdbc {
      plugin_input = "stock-tf-out"
      url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&amp;characterEncoding=UTF-8&amp;rewriteBatchedStatements=true"
      driver = "com.mysql.cj.jdbc.Driver"
      user = "lab"
      password = "XXX"
      compatible_mode="starrocks"
      query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
      }
}

# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景
// sink {
//   StarRocks {
//   plugin_input = "stock-tf-out"
//   nodeUrls = ["ip:8030"]
//   base-url = "jdbc:mysql://ip:9030/"
//   username = "lab"
//   password = "XXX"
//   database = "scm"
//   table = "ods_sap_stock"
//   batch_max_rows = 1000
//   data_save_mode="DROP_DATA"
//   starrocks.config = {
//       format = "JSON"
//       strip_outer_array = true
//   }
//   schema_save_mode = "RECREATE_SCHEMA"
//   save_mode_create_template="""
//       CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (
//         MATNR STRING COMMENT '物料',
//         WERKS STRING COMMENT '工厂',
//         LGORT STRING COMMENT '库存地点',
//         MAKTX STRING COMMENT '物料描述',
//         NAME1 STRING COMMENT '工厂名称',
//         LGOBE STRING COMMENT '地点描述',
//         CHARG STRING COMMENT '批次编号',
//         MEINS STRING COMMENT '单位',
//         LABST DOUBLE COMMENT '非限制使用库存',
//         UMLME DOUBLE COMMENT '在途库存',
//         INSME DOUBLE COMMENT '质检库存',
//         EINME DOUBLE COMMENT '受限制使用的库存',
//         SPEME DOUBLE COMMENT '已冻结的库存',
//         RETME DOUBLE COMMENT '退货'
//       ) ENGINE=OLAP
//       PRIMARY KEY ( MATNR,WERKS,LGORT)
//       COMMENT 'sap库存'
//       DISTRIBUTED BY HASH (WERKS) PROPERTIES (
//       "replication_num" = "1"
//       )
//   """
//   }
// }
</code></pre>
<ul>
<li>http to paimon<br>
<code>example/http2paimon</code></li>
</ul>
<pre><code>env {
parallelism = 1
job.mode = "BATCH"
}

source {
Http {
    plugin_output = "stock"
    url = "https://ip/http/prd/query_sap_stock"
    method = "POST"
    headers {
      Authorization = "Basic XXX"
      Content-Type = "application/json"
    }
    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
    format = "json"
    content_field = "$.ET_RETURN.*"
    schema {
      fields {
      MATNR = "string"
      MAKTX = "string"
      WERKS = "string"
      NAME1 = "string"
      LGORT = "string"
      LGOBE = "string"
      CHARG = "string"
      MEINS = "string"
      LABST = "double"
      UMLME = "double"
      INSME = "double"
      EINME = "double"
      SPEME = "double"
      RETME = "double"
      }
    }
}
}
# 此转换操作主要用于字段从命名等方便用途
transform {
Sql {
    plugin_input = "stock"
    plugin_output = "stock-tf-out"
    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
}
}

# 连接paimon进行数据同步,paimon 暂时 未看到有支持 insert overwrite 分区覆写,此例仅作为参考,不适用本此例子需求
sink {
Paimon {
    warehouse = "s3a://test/"
    database = "sap"
    table = "ods_sap_stock"
    paimon.hadoop.conf = {
      fs.s3a.access-key=XXX
      fs.s3a.secret-key=XXX
      fs.s3a.endpoint="http://minio:9000"
      fs.s3a.path.style.access=true
      fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
    }
}
}

</code></pre>
<h2 id="dolphinscheduler集成seatunnel">DolphinScheduler集成SeaTunnel</h2>
<ul>
<li>制作worker镜像</li>
</ul>
<pre><code>FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2
RUN mkdir /opt/seatunnel
RUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10
# 容器集成seatunnel
COPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/
</code></pre>
<p>打包镜像,推送到镜像仓库</p>
<pre><code>docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .
</code></pre>
<ul>
<li>使用新镜像部署一个worker,此处修改<code>docker-compose.yaml</code>,增加一个<code>dolphinscheduler-worker-seatunnel</code>节点。</li>
</ul>
<pre><code>...
dolphinscheduler-worker-seatunnel:
    image: xxx/dolphinscheduler-worker:3.2.2-seatunnel
    profiles: ["all"]
    env_file: .env
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ]
      interval: 30s
      timeout: 5s
      retries: 3
    depends_on:
      dolphinscheduler-zookeeper:
      condition: service_healthy
    volumes:
      - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler
      - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs
      - ./dolphinscheduler-shared-local:/opt/soft
      - ./dolphinscheduler-resource-local:/dolphinscheduler
    networks:
      dolphinscheduler:
      ipv4_address: 172.15.0.18
...
</code></pre>
<ul>
<li>DolphinScheduler配置SeaTunnel分组及环境配置
<ul>
<li>
<p>安全中心-Worker分组管理,创建一个这个节点ip的分组,用于以后需要seatunnel的任务跑该分组<br>
<img alt="file" loading="lazy" src="http://openwrite-whaleops.oss-cn-zhangjiakou.aliyuncs.com/31504_F1C3DFB781594BE39F07B98C496419B8" class="lazyload"></p>
</li>
<li>
<p>环境管理-创建环境,增加一个用于执行seatunnel的环境,同时需要绑定Worker分组为上一步创建的seatunnel分组<br>
<img alt="file" loading="lazy" src="http://openwrite-whaleops.oss-cn-zhangjiakou.aliyuncs.com/31504_FC3FA8A435C54181B520CA08D5643A0A" class="lazyload"></p>
</li>
<li>
<p>创建工作流定义,把上面的seatunnel任务配置填写上<br>
<img alt="file" loading="lazy" src="http://openwrite-whaleops.oss-cn-zhangjiakou.aliyuncs.com/31504_9DFEFF5E2F0345A391252249197C0F92" class="lazyload"></p>
</li>
<li>
<p>运行时候,选择seatunnel的worker分组和环境即可跑在这个集成了seatunnel的环境上<br>
<img alt="file" loading="lazy" src="http://openwrite-whaleops.oss-cn-zhangjiakou.aliyuncs.com/31504_0C92143136C2448780D15D46A3CBAB4A" class="lazyload"></p>
</li>
</ul>
</li>
</ul>
<p>转载自俊瑶先森<br>
原文链接:https://junyao.tech/posts/9c6867c5.html</p>
<blockquote>
<p>本文由 白鲸开源 提供发布支持!</p>
</blockquote><br><br>
来源:https://www.cnblogs.com/DolphinScheduler/p/18867705
頁: [1]
查看完整版本: HTTP接口数据也能定时同步入湖?用DolphinScheduler×SeaTunnel快速搞定!