【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程
后台-插件-广告管理-内容页头部广告(手机) |
Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章
本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。
源码修改编译
克隆SeaYunnel-Web源码到本地
git clone https://github.com/apache/seatunnel-web.git在idea中打开项目
升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖
- 1
- 2
因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。
社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。
org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType
public static class SeaTunnelDataTypeConvertor implements DataTypeConvertor- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl
public TableSchemaServiceImpl() throws IOException { Common.setStarter(true); Set- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()
public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) { Common.setDeployMode(DeployMode.CLIENT); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); try { SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId())); jobInstanceDao.update(jobInstance); CompletableFuture.runAsync( () -> { waitJobFinish( clientJobProxy, userId, jobInstanceId, Long.toString(clientJobProxy.getJobId()), seaTunnelClient); }); } catch (ExecutionException | InterruptedException e) { ExceptionUtils.getMessage(e); throw new RuntimeException(e); } return jobInstanceId; }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl
else if (statusList.contains("CANCELLING")) { jobStatus = JobStatus.CANCELLING.name(); // 改为 else if (statusList.contains("CANCELING")) { jobStatus = JobStatus.CANCELING.name();- 1
- 2
- 3
- 4
org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl
TableFactoryContext context = new TableFactoryContext( Collections.singletonList(table), ReadonlyConfig.fromMap(config), Thread.currentThread().getContextClassLoader()); // 改为 TableTransformFactoryContext context = new TableTransformFactoryContext( Collections.singletonList(table), ReadonlyConfig.fromMap(config), Thread.currentThread().getContextClassLoader());- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy
public void restoreJob( @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); try { seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute(); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 改为 public void restoreJob( @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); try { seaTunnelClient .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId) .execute(); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil
public static Map- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
代码格式化
mvn spotless:apply编译打包
mvn clean package -DskipTests至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成
Linux部署测试
这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南
重要的配置项
1、seatunnel-web数据库相关配置(application.yml) 用来web服务中的数据持久化 2、SEATUNNEL_HOME(环境变量) seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器 3、ST_WEB_HOME(环境变量) seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义 4、重要的配置文件: connector-datasource-mapper.yaml 该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等) hazelcast-client.yaml seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!
本文由 白鲸开源科技 提供发布支持!
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。
在线投稿:投稿 站长QQ:1888636
后台-插件-广告管理-内容页尾部广告(手机) |