From 04cbf34d0867570980658f83de79b089f529ad8f Mon Sep 17 00:00:00 2001 From: saf3dfsa Date: Wed, 13 Nov 2024 15:59:31 +0800 Subject: [PATCH] feat(dms): support to get kafka topic producers --- .../data-sources/dms_kafka_topic_producers.md | 53 +++++++ huaweicloud/provider.go | 1 + ...weicloud_dms_kafka_topic_producers_test.go | 44 ++++++ ...e_huaweicloud_dms_kafka_topic_producers.go | 138 ++++++++++++++++++ 4 files changed, 236 insertions(+) create mode 100644 docs/data-sources/dms_kafka_topic_producers.md create mode 100644 huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_topic_producers_test.go create mode 100644 huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_topic_producers.go diff --git a/docs/data-sources/dms_kafka_topic_producers.md b/docs/data-sources/dms_kafka_topic_producers.md new file mode 100644 index 0000000000..c94d2e8c64 --- /dev/null +++ b/docs/data-sources/dms_kafka_topic_producers.md @@ -0,0 +1,53 @@ +--- +subcategory: "Distributed Message Service (DMS)" +layout: "huaweicloud" +page_title: "HuaweiCloud: huaweicloud_dms_kafka_topic_producers" +description: |- + Use this data source to get the list of Kafka topic producers. +--- + +# huaweicloud_dms_kafka_topic_producers + +Use this data source to get the list of Kafka topic producers. + +## Example Usage + +```hcl +variable "instance_id" {} +variable "topic" {} + +data "huaweicloud_dms_kafka_topic_producers" "test" { + instance_id = var.instance_id + topic = var.topic +} +``` + +## Argument Reference + +The following arguments are supported: + +* `region` - (Optional, String) Specifies the region in which to query the resource. + If omitted, the provider-level region will be used. + +* `instance_id` - (Required, String) Specifies the instance ID. + +* `topic` - (Required, String) Specifies the topic name. + +## Attribute Reference + +In addition to all arguments above, the following attributes are exported: + +* `id` - The data source ID. + +* `producers` - Indicates the producer list. + + The [producers](#producers_struct) structure is documented below. + + +The `producers` block supports: + +* `producer_address` - Indicates the producer address. + +* `broker_address` - Indicates the broker address. + +* `join_time` - Indicates the time when the broker was connected. diff --git a/huaweicloud/provider.go b/huaweicloud/provider.go index 7441c80dcb..e8a68b6be1 100644 --- a/huaweicloud/provider.go +++ b/huaweicloud/provider.go @@ -690,6 +690,7 @@ func Provider() *schema.Provider { "huaweicloud_dms_kafka_smart_connect_tasks": dms.DataSourceDmsKafkaSmartConnectTasks(), "huaweicloud_dms_kafkav2_smart_connect_tasks": dms.DataSourceDmsKafkav2SmartConnectTasks(), "huaweicloud_dms_kafka_user_client_quotas": dms.DataSourceDmsKafkaUserClientQuotas(), + "huaweicloud_dms_kafka_topic_producers": dms.DataSourceDmsKafkaTopicProducers(), "huaweicloud_dms_kafka_topics": dms.DataSourceDmsKafkaTopics(), "huaweicloud_dms_kafka_users": dms.DataSourceDmsKafkaUsers(), "huaweicloud_dms_kafka_message_diagnosis_tasks": dms.DataSourceDmsKafkaMessageDiagnosisTasks(), diff --git a/huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_topic_producers_test.go b/huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_topic_producers_test.go new file mode 100644 index 0000000000..bf89ebe54d --- /dev/null +++ b/huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_topic_producers_test.go @@ -0,0 +1,44 @@ +package dms + +import ( + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/services/acceptance" +) + +func TestAccDataSourceDmsKafkaTopicProducers_basic(t *testing.T) { + dataSource := "data.huaweicloud_dms_kafka_topic_producers.test" + dc := acceptance.InitDataSourceCheck(dataSource) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acceptance.TestAccPreCheck(t) + acceptance.TestAccPreCheckDMSKafkaInstanceID(t) + acceptance.TestAccPreCheckDMSKafkaTopicName(t) + }, + ProviderFactories: acceptance.TestAccProviderFactories, + Steps: []resource.TestStep{ + { + Config: testDataSourceDmsKafkaTopicProducers_basic(), + Check: resource.ComposeTestCheckFunc( + dc.CheckResourceExists(), + resource.TestCheckResourceAttrSet(dataSource, "producers.#"), + resource.TestCheckResourceAttrSet(dataSource, "producers.0.producer_address"), + resource.TestCheckResourceAttrSet(dataSource, "producers.0.broker_address"), + resource.TestCheckResourceAttrSet(dataSource, "producers.0.join_time"), + ), + }, + }, + }) +} + +func testDataSourceDmsKafkaTopicProducers_basic() string { + return fmt.Sprintf(` +data "huaweicloud_dms_kafka_topic_producers" "test" { + instance_id = "%[1]s" + topic = "%[2]s" +}`, acceptance.HW_DMS_KAFKA_INSTANCE_ID, acceptance.HW_DMS_KAFKA_TOPIC_NAME) +} diff --git a/huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_topic_producers.go b/huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_topic_producers.go new file mode 100644 index 0000000000..2a91f9f1b6 --- /dev/null +++ b/huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_topic_producers.go @@ -0,0 +1,138 @@ +// Generated by PMS #425 +package dms + +import ( + "context" + "strings" + + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/tidwall/gjson" + + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/config" + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/helper/httphelper" + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/helper/schemas" + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/utils" +) + +func DataSourceDmsKafkaTopicProducers() *schema.Resource { + return &schema.Resource{ + ReadContext: dataSourceDmsKafkaTopicProducersRead, + + Schema: map[string]*schema.Schema{ + "region": { + Type: schema.TypeString, + Optional: true, + Computed: true, + Description: `Specifies the region in which to query the resource. If omitted, the provider-level region will be used.`, + }, + "instance_id": { + Type: schema.TypeString, + Required: true, + Description: `Specifies the instance ID.`, + }, + "topic": { + Type: schema.TypeString, + Required: true, + Description: `Specifies the topic name.`, + }, + "producers": { + Type: schema.TypeList, + Computed: true, + Description: `Indicates the producer list.`, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "producer_address": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the producer address.`, + }, + "broker_address": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the broker address.`, + }, + "join_time": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the time when the broker was connected.`, + }, + }, + }, + }, + }, + } +} + +type KafkaTopicProducersDSWrapper struct { + *schemas.ResourceDataWrapper + Config *config.Config +} + +func newKafkaTopicProducersDSWrapper(d *schema.ResourceData, meta interface{}) *KafkaTopicProducersDSWrapper { + return &KafkaTopicProducersDSWrapper{ + ResourceDataWrapper: schemas.NewSchemaWrapper(d), + Config: meta.(*config.Config), + } +} + +func dataSourceDmsKafkaTopicProducersRead(_ context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + wrapper := newKafkaTopicProducersDSWrapper(d, meta) + lisTopProRst, err := wrapper.ListTopicProducers() + if err != nil { + return diag.FromErr(err) + } + + err = wrapper.listTopicProducersToSchema(lisTopProRst) + if err != nil { + return diag.FromErr(err) + } + + id, err := uuid.GenerateUUID() + if err != nil { + return diag.FromErr(err) + } + d.SetId(id) + return nil +} + +// @API Kafka GET /v2/{project_id}/kafka/instances/{instance_id}/topics/{topic}/producers +func (w *KafkaTopicProducersDSWrapper) ListTopicProducers() (*gjson.Result, error) { + client, err := w.NewClient(w.Config, "dmsv2") + if err != nil { + return nil, err + } + + uri := "/v2/{project_id}/kafka/instances/{instance_id}/topics/{topic}/producers" + uri = strings.ReplaceAll(uri, "{instance_id}", w.Get("instance_id").(string)) + uri = strings.ReplaceAll(uri, "{topic}", w.Get("topic").(string)) + return httphelper.New(client). + Method("GET"). + URI(uri). + OffsetPager("producers", "offset", "limit", 0). + Request(). + Result() +} + +func (w *KafkaTopicProducersDSWrapper) listTopicProducersToSchema(body *gjson.Result) error { + d := w.ResourceData + mErr := multierror.Append(nil, + d.Set("region", w.Config.GetRegion(w.ResourceData)), + d.Set("producers", schemas.SliceToList(body.Get("producers"), + func(producers gjson.Result) any { + return map[string]any{ + "producer_address": producers.Get("producer_address").Value(), + "broker_address": producers.Get("broker_address").Value(), + "join_time": w.setProducersJoinTime(producers), + } + }, + )), + ) + return mErr.ErrorOrNil() +} + +func (*KafkaTopicProducersDSWrapper) setProducersJoinTime(data gjson.Result) string { + return utils.FormatTimeStampRFC3339((data.Get("join_time").Int())/1000, true) +}