数据源 Client SDK 的使用

    建立Scala的测试类LinkisDataSourceClientTest,具体接口含义可以见注释:

    1. import com.fasterxml.jackson.databind.ObjectMapper
    2. import org.apache.linkis.common.utils.JsonUtils
    3. import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}
    4. import org.apache.linkis.datasource.client.request.{CreateDataSourceAction, GetAllDataSourceTypesAction, MetadataGetDatabasesAction, UpdateDataSourceParameterAction}
    5. import org.apache.linkis.datasourcemanager.common.domain.{DataSource, DataSourceType}
    6. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
    7. import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
    8. import java.io.StringWriter
    9. import java.util
    10. import java.util.concurrent.TimeUnit
    11. object LinkisDataSourceClientTest {
    12. def main(args: Array[String]): Unit = {
    13. val clientConfig =DWSClientConfigBuilder.newBuilder
    14. .addServerUrl("http://127.0.0.1:9001") //set linkis-mg-gateway url: http://{ip}:{port}
    15. .connectionTimeout(30000) //connection timtout
    16. .discoveryEnabled(false) //disable discovery
    17. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
    18. .loadbalancerEnabled(true) // enable loadbalance
    19. .maxConnectionSize(5) // set max Connection
    20. .retryEnabled(false) // set retry
    21. .readTimeout(30000) //set read timeout
    22. .setAuthenticationStrategy(new StaticAuthenticationStrategy) //AuthenticationStrategy Linkis authen suppory static and Token
    23. .setAuthTokenKey("hadoop") // set submit user
    24. .setAuthTokenValue("xxx") // set passwd or token
    25. .setDWSVersion("v1") //linkis rest version v1
    26. .build
    27. //init datasource remote client
    28. val dataSourceClient = new LinkisDataSourceRemoteClient(clientConfig)
    29. //init metadata remote client
    30. val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig)
    31. //get all datasource type
    32. testGetAllDataSourceTypes(dataSourceClient)
    33. //create kafka datasource
    34. //create es datasource
    35. testCreateDataSourceForEs(dataSourceClient)
    36. //update datasource parameter for kafka
    37. testUpdateDataSourceParameterForKafka(dataSourceClient)
    38. //update datasource parameter for es
    39. testUpdateDataSourceParameterForEs(dataSourceClient)
    40. //get hive metadata database list
    41. testMetadataGetDatabases(metaDataClient)
    42. }
    43. def testGetAllDataSourceTypes(client:LinkisDataSourceRemoteClient): Unit ={
    44. val getAllDataSourceTypesResult = client.getAllDataSourceTypes(GetAllDataSourceTypesAction.builder().setUser("hadoop").build()).getAllDataSourceType
    45. System.out.println(getAllDataSourceTypesResult)
    46. }
    47. def testCreateDataSourceForKafka(client:LinkisDataSourceRemoteClient): Unit ={
    48. val dataSource = new DataSource();
    49. val dataSourceType = new DataSourceType
    50. dataSourceType.setName("kafka")
    51. dataSourceType.setId("2")
    52. dataSourceType.setLayers(2)
    53. dataSourceType.setClassifier("消息队列")
    54. dataSourceType.setDescription("kafka")
    55. dataSource.setDataSourceType(dataSourceType)
    56. dataSource.setDataSourceName("kafka-test")
    57. dataSource.setCreateSystem("client")
    58. dataSource.setDataSourceTypeId(2l);
    59. val dsJsonWriter = new StringWriter
    60. val mapper = new ObjectMapper
    61. JsonUtils.jackson.writeValue(dsJsonWriter, dataSource)
    62. val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass)
    63. val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id
    64. System.out.println(id)
    65. }
    66. val dataSource = new DataSource();
    67. dataSource.setDataSourceName("es-test")
    68. dataSource.setCreateSystem("client")
    69. dataSource.setDataSourceTypeId(7l);
    70. val dsJsonWriter = new StringWriter
    71. val mapper = new ObjectMapper
    72. JsonUtils.jackson.writeValue(dsJsonWriter, dataSource)
    73. val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass)
    74. val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id
    75. System.out.println(id)
    76. }
    77. def testUpdateDataSourceParameterForKafka(client:LinkisDataSourceRemoteClient): Unit ={
    78. val params = new util.HashMap[String,Any]()
    79. val connParams = new util.HashMap[String,Any]()
    80. connParams.put("brokers","172.24.2.232:9092")
    81. params.put("connectParams",connParams)
    82. params.put("comment","kafka data source")
    83. client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("7").addRequestPayloads(params).build())
    84. }
    85. def testUpdateDataSourceParameterForEs(client:LinkisDataSourceRemoteClient): Unit ={
    86. val params = new util.HashMap[String,Any]()
    87. val connParams = new util.HashMap[String,Any]()
    88. val elasticUrls = new util.ArrayList[String]()
    89. elasticUrls.add("http://172.24.2.231:9200")
    90. connParams.put("elasticUrls",elasticUrls)
    91. params.put("connectParams",connParams)
    92. params.put("comment","es data source")
    93. client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("8").addRequestPayloads(params).build())
    94. }
    95. def testMetadataGetDatabases(client:LinkisMetaDataRemoteClient): Unit ={
    96. client.getDatabases(MetadataGetDatabasesAction.builder().setUser("hadoop").setDataSourceId(9l).setUser("hadoop").setSystem("client").build()).getDbs
    97. }