您现在的位置是:首页 > 技术教程 正文

【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

admin 阅读: 2024-03-31
后台-插件-广告管理-内容页头部广告(手机)

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

git clone https://github.com/apache/seatunnel-web.git

    在idea中打开项目

    升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

    2.3.3 改为 2.3.4
    • 1
    • 2

    因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

    社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

    org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

    public static class SeaTunnelDataTypeConvertor implements DataTypeConvertor> { @Override public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) { return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType(); } @Override public SeaTunnelDataType<?> toSeaTunnelType( SeaTunnelDataType<?> seaTunnelDataType, Map map) throws DataTypeConvertException { return seaTunnelDataType; } @Override public SeaTunnelDataType<?> toConnectorType( SeaTunnelDataType<?> seaTunnelDataType, Map map) throws DataTypeConvertException { return seaTunnelDataType; } @Override public String getIdentity() { return "EngineDataTypeConvertor"; } } // 改为 public static class SeaTunnelDataTypeConvertor implements DataTypeConvertor> { @Override public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) { return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType(); } @Override public SeaTunnelDataType<?> toSeaTunnelType( String s, SeaTunnelDataType<?> seaTunnelDataType, Map map) { return seaTunnelDataType; } @Override public SeaTunnelDataType<?> toConnectorType( String s, SeaTunnelDataType<?> seaTunnelDataType, Map map) { return seaTunnelDataType; } @Override public String getIdentity() { return "EngineDataTypeConvertor"; } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

    public TableSchemaServiceImpl() throws IOException { Common.setStarter(true); Set pluginIdentifiers = SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet(); ArrayList pluginIdentifiersList = new ArrayList<>(); pluginIdentifiersList.addAll(pluginIdentifiers); List pluginJarPaths = new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList); // Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir(); if (!pluginJarPaths.isEmpty()) { // List files = FileUtils.searchJarFiles(path); pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir())); factory = new DataTypeConvertorFactory( new URLClassLoader(pluginJarPaths.toArray(new URL[0]))); } else { factory = new DataTypeConvertorFactory(); } } // 改为 public TableSchemaServiceImpl() throws IOException { Common.setStarter(true); Set pluginIdentifiers = SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet(); ArrayList pluginIdentifiersList = new ArrayList<>(); pluginIdentifiersList.addAll(pluginIdentifiers); List pluginJarPaths = new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList); // Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir(); if (!pluginJarPaths.isEmpty()) { // List files = FileUtils.searchJarFiles(path); pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir())); factory = new DataTypeConvertorFactory( new URLClassLoader(pluginJarPaths.toArray(new URL[0]))); } else { factory = new DataTypeConvertorFactory(); } } SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType()); // 改为 SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getName(), field.getType());
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

    public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) { Common.setDeployMode(DeployMode.CLIENT); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); try { SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId())); jobInstanceDao.update(jobInstance); CompletableFuture.runAsync( () -> { waitJobFinish( clientJobProxy, userId, jobInstanceId, Long.toString(clientJobProxy.getJobId()), seaTunnelClient); }); } catch (ExecutionException | InterruptedException e) { ExceptionUtils.getMessage(e); throw new RuntimeException(e); } return jobInstanceId; }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

    else if (statusList.contains("CANCELLING")) { jobStatus = JobStatus.CANCELLING.name(); // 改为 else if (statusList.contains("CANCELING")) { jobStatus = JobStatus.CANCELING.name();
    • 1
    • 2
    • 3
    • 4

    org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

    TableFactoryContext context = new TableFactoryContext( Collections.singletonList(table), ReadonlyConfig.fromMap(config), Thread.currentThread().getContextClassLoader()); // 改为 TableTransformFactoryContext context = new TableTransformFactoryContext( Collections.singletonList(table), ReadonlyConfig.fromMap(config), Thread.currentThread().getContextClassLoader());
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

    public void restoreJob( @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); try { seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute(); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 改为 public void restoreJob( @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); try { seaTunnelClient .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId) .execute(); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

    public static Map getConnectorFeatures( PluginType pluginType) throws IOException { Common.setStarter(true); if (!pluginType.equals(PluginType.SOURCE)) { throw new UnsupportedOperationException("ONLY support plugin type source"); } Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir(); List factories; if (path.toFile().exists()) { List files = FileUtils.searchJarFiles(path); factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0]))); } else { factories = FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader()); } Map featureMap = new ConcurrentHashMap<>(); factories.forEach( plugin -> { if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) { TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin; PluginIdentifier info = PluginIdentifier.of( "seatunnel", PluginType.SOURCE.getType(), plugin.factoryIdentifier()); featureMap.put( info, new ConnectorFeature( SupportColumnProjection.class.isAssignableFrom( tableSourceFactory.getSourceClass()))); } }); return featureMap; } // 改为 public static Map getConnectorFeatures( PluginType pluginType) { Common.setStarter(true); if (!pluginType.equals(PluginType.SOURCE)) { throw new UnsupportedOperationException("ONLY support plugin type source"); } ArrayList pluginIdentifiers = new ArrayList<>(); pluginIdentifiers.addAll( SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet()); List pluginJarPaths = new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers); List factories; if (!pluginJarPaths.isEmpty()) { factories = FactoryUtil.discoverFactories( new URLClassLoader(pluginJarPaths.toArray(new URL[0]))); } else { factories = FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader()); } Map featureMap = new ConcurrentHashMap<>(); factories.forEach( plugin -> { if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) { TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin; PluginIdentifier info = PluginIdentifier.of( "seatunnel", PluginType.SOURCE.getType(), plugin.factoryIdentifier()); featureMap.put( info, new ConnectorFeature( SupportColumnProjection.class.isAssignableFrom( tableSourceFactory.getSourceClass()))); } }); return featureMap;
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    代码格式化

    mvn spotless:apply

      编译打包

      mvn clean package -DskipTests

        至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

        Linux部署测试

        这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

        重要的配置项

        1、seatunnel-web数据库相关配置(application.yml) 用来web服务中的数据持久化 2、SEATUNNEL_HOME(环境变量) seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器 3、ST_WEB_HOME(环境变量) seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义 4、重要的配置文件: connector-datasource-mapper.yaml 该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等) hazelcast-client.yaml seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13

        感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

        本文由 白鲸开源科技 提供发布支持!

        标签:
        声明

        1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

        在线投稿:投稿 站长QQ:1888636

        后台-插件-广告管理-内容页尾部广告(手机)
        关注我们

        扫一扫关注我们,了解最新精彩内容

        搜索
        排行榜