DataSource Client SDK 的使用

    2. Scala测试代码

    建立Scala的测试类LinkisDataSourceClientTest,具体接口含义可以见注释:

    1. import com.fasterxml.jackson.databind.ObjectMapper
    2. import org.apache.linkis.common.utils.JsonUtils
    3. import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}
    4. import org.apache.linkis.datasource.client.request.{CreateDataSourceAction, GetAllDataSourceTypesAction, MetadataGetDatabasesAction, UpdateDataSourceParameterAction}
    5. import org.apache.linkis.datasourcemanager.common.domain.{DataSource, DataSourceType}
    6. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
    7. import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
    8. import java.io.StringWriter
    9. import java.util
    10. import java.util.concurrent.TimeUnit
    11. object LinkisDataSourceClientTest {
    12. def main(args: Array[String]): Unit = {
    13. val clientConfig =DWSClientConfigBuilder.newBuilder
    14. .addServerUrl("http://127.0.0.1:9001") //set linkis-mg-gateway url: http://{ip}:{port}
    15. .connectionTimeout(30000) //connection timtout
    16. .discoveryEnabled(false) //disable discovery
    17. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
    18. .loadbalancerEnabled(true) // enable loadbalance
    19. .maxConnectionSize(5) // set max Connection
    20. .retryEnabled(false) // set retry
    21. .readTimeout(30000) //set read timeout
    22. .setAuthenticationStrategy(new StaticAuthenticationStrategy) //AuthenticationStrategy Linkis authen suppory static and Token
    23. .setAuthTokenKey("hadoop") // set submit user
    24. .setAuthTokenValue("xxx") // set passwd or token
    25. .setDWSVersion("v1") //linkis rest version v1
    26. .build
    27. //init datasource remote client
    28. val dataSourceClient = new LinkisDataSourceRemoteClient(clientConfig)
    29. //init metadata remote client
    30. val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig)
    31. //get all datasource type
    32. testGetAllDataSourceTypes(dataSourceClient)
    33. testCreateDataSourceForKafka(dataSourceClient)
    34. testCreateDataSourceForEs(dataSourceClient)
    35. //update datasource parameter for kafka
    36. testUpdateDataSourceParameterForKafka(dataSourceClient)
    37. //update datasource parameter for es
    38. testUpdateDataSourceParameterForEs(dataSourceClient)
    39. //get hive metadata database list
    40. testMetadataGetDatabases(metaDataClient)
    41. }
    42. def testGetAllDataSourceTypes(client:LinkisDataSourceRemoteClient): Unit ={
    43. val getAllDataSourceTypesResult = client.getAllDataSourceTypes(GetAllDataSourceTypesAction.builder().setUser("hadoop").build()).getAllDataSourceType
    44. System.out.println(getAllDataSourceTypesResult)
    45. }
    46. def testCreateDataSourceForKafka(client:LinkisDataSourceRemoteClient): Unit ={
    47. val dataSource = new DataSource();
    48. val dataSourceType = new DataSourceType
    49. dataSourceType.setName("kafka")
    50. dataSourceType.setId("2")
    51. dataSourceType.setLayers(2)
    52. dataSourceType.setClassifier("消息队列")
    53. dataSourceType.setDescription("kafka")
    54. dataSource.setDataSourceType(dataSourceType)
    55. dataSource.setDataSourceName("kafka-test")
    56. dataSource.setCreateSystem("client")
    57. dataSource.setDataSourceTypeId(2l);
    58. val dsJsonWriter = new StringWriter
    59. val mapper = new ObjectMapper
    60. JsonUtils.jackson.writeValue(dsJsonWriter, dataSource)
    61. val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass)
    62. val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id
    63. System.out.println(id)
    64. }
    65. val dataSource = new DataSource();
    66. dataSource.setDataSourceName("es-test")
    67. dataSource.setCreateSystem("client")
    68. dataSource.setDataSourceTypeId(7l);
    69. val dsJsonWriter = new StringWriter
    70. val mapper = new ObjectMapper
    71. JsonUtils.jackson.writeValue(dsJsonWriter, dataSource)
    72. val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass)
    73. val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id
    74. System.out.println(id)
    75. }
    76. def testUpdateDataSourceParameterForKafka(client:LinkisDataSourceRemoteClient): Unit ={
    77. val params = new util.HashMap[String,Any]()
    78. val connParams = new util.HashMap[String,Any]()
    79. connParams.put("brokers","172.24.2.232:9092")
    80. params.put("connectParams",connParams)
    81. params.put("comment","kafka data source")
    82. client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("7").addRequestPayloads(params).build())
    83. }
    84. def testUpdateDataSourceParameterForEs(client:LinkisDataSourceRemoteClient): Unit ={
    85. val params = new util.HashMap[String,Any]()
    86. val connParams = new util.HashMap[String,Any]()
    87. val elasticUrls = new util.ArrayList[String]()
    88. elasticUrls.add("http://172.24.2.231:9200")
    89. connParams.put("elasticUrls",elasticUrls)
    90. params.put("connectParams",connParams)
    91. params.put("comment","es data source")
    92. client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("8").addRequestPayloads(params).build())
    93. }
    94. def testMetadataGetDatabases(client:LinkisMetaDataRemoteClient): Unit ={
    95. client.getDatabases(MetadataGetDatabasesAction.builder().setUser("hadoop").setDataSourceId(9l).setUser("hadoop").setSystem("client").build()).getDbs
    96. }
    97. }