Skip to content

Commit

Permalink
feat(dms): support to produce message for kafka (#5850)
Browse files Browse the repository at this point in the history
  • Loading branch information
saf3dfsa authored Nov 14, 2024
1 parent d9590d5 commit 7a70db5
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 0 deletions.
72 changes: 72 additions & 0 deletions docs/resources/dms_kafka_message_produce.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
subcategory: "Distributed Message Service (DMS)"
layout: "huaweicloud"
page_title: "HuaweiCloud: huaweicloud_dms_kafka_message_produce"
description: |-
Manages a DMS kafka message produce resource within HuaweiCloud.
---

# huaweicloud_dms_kafka_message_produce

Manages a DMS kafka message produce resource within HuaweiCloud.

## Example Usage

```hcl
variable "instance_id" {}
variable "topic_name" {}
resource "huaweicloud_dms_kafka_message_produce" "test" {
instance_id = var.instance_id
topic = var.topic_name
body = "test"
property_list {
name = "KEY"
value = "testKey"
}
property_list {
name = "PARTITION"
value = "1"
}
}
```

## Argument Reference

The following arguments are supported:

* `region` - (Optional, String, ForceNew) Specifies the region in which to create the resource.
If omitted, the provider-level region will be used.
Changing this creates a new resource.

* `instance_id` - (Required, String, ForceNew) Specifies the instance ID.
Changing this creates a new resource.

* `topic` - (Required, String, ForceNew) Specifies the topic name.
Changing this creates a new resource.

* `body` - (Required, String, ForceNew) Specifies the message content.
Changing this creates a new resource.

* `property_list` - (Optional, List, ForceNew) Specifies the topic partition information.
Changing this creates a new resource.
The [property_list](#block--property_list) structure is documented below.

<a name="block--property_list"></a>
The `property_list` block supports:

* `name` - (Required, String, ForceNew) Specifies the feature name.
+ **KEY**: Specifies the message key.
+ **PARTITION** : Specifies the partition to which the message will be sent.
Changing this creates a new resource.

* `value` - (Required, String, ForceNew) Specifies the feature value.
Changing this creates a new resource.

## Attribute Reference

In addition to all arguments above, the following attributes are exported:

* `id` - The resource ID in UUID format.
1 change: 1 addition & 0 deletions huaweicloud/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,7 @@ func Provider() *schema.Provider {
"huaweicloud_dms_kafka_permissions": dms.ResourceDmsKafkaPermissions(),
"huaweicloud_dms_kafka_instance": dms.ResourceDmsKafkaInstance(),
"huaweicloud_dms_kafka_topic": dms.ResourceDmsKafkaTopic(),
"huaweicloud_dms_kafka_message_produce": dms.ResourceDmsKafkaMessageProduce(),
"huaweicloud_dms_kafka_partition_reassign": dms.ResourceDmsKafkaPartitionReassign(),
"huaweicloud_dms_kafka_consumer_group": dms.ResourceDmsKafkaConsumerGroup(),
"huaweicloud_dms_kafka_smart_connect": dms.ResourceDmsKafkaSmartConnect(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package dms

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"

"github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/services/acceptance"
)

func TestAccKafkaMessageProduce_basic(t *testing.T) {
rName := acceptance.RandomAccResourceNameWithDash()

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acceptance.TestAccPreCheck(t)
},
ProviderFactories: acceptance.TestAccProviderFactories,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccKafkaMessageProduce_basic(rName),
},
},
})
}

func testAccKafkaMessageProduce_basic(rName string) string {
return fmt.Sprintf(`
%[1]s
resource "huaweicloud_dms_kafka_message_produce" "test" {
depends_on = [huaweicloud_dms_kafka_topic.topic]
instance_id = huaweicloud_dms_kafka_instance.test.id
topic = huaweicloud_dms_kafka_topic.topic.name
body = "test"
property_list {
name = "KEY"
value = "testKey"
}
property_list {
name = "PARTITION"
value = "1"
}
}`, testAccDmsKafkaTopic_basic(rName))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package dms

import (
"context"
"strings"

"github.com/hashicorp/go-uuid"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"

"github.com/chnsz/golangsdk"

"github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/config"
"github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/utils"
)

// @API Kafka POST /v2/{project_id}/instances/{instance_id}/messages/action
func ResourceDmsKafkaMessageProduce() *schema.Resource {
return &schema.Resource{
CreateContext: resourceDmsKafkaMessageProduceCreate,
ReadContext: resourceDmsKafkaMessageProduceRead,
DeleteContext: resourceDmsKafkaMessageProduceDelete,

Schema: map[string]*schema.Schema{
"region": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ForceNew: true,
},
"instance_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"topic": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"body": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"property_list": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"value": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
},
},
},
}
}

func resourceDmsKafkaMessageProduceCreate(_ context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
cfg := meta.(*config.Config)
region := cfg.GetRegion(d)
client, err := cfg.NewServiceClient("dmsv2", region)
if err != nil {
return diag.Errorf("error creating DMS client: %s", err)
}

createHttpUrl := "v2/{project_id}/instances/{instance_id}/messages/action?action_id={action_id}"
createPath := client.Endpoint + createHttpUrl
createPath = strings.ReplaceAll(createPath, "{project_id}", client.ProjectID)
createPath = strings.ReplaceAll(createPath, "{instance_id}", d.Get("instance_id").(string))
createPath = strings.ReplaceAll(createPath, "{action_id}", "send")

createOpt := golangsdk.RequestOpts{
KeepResponseBody: true,
JSONBody: utils.RemoveNil(buildCreateKafkaMessageProduceBodyParams(d)),
}

_, err = client.Request("POST", createPath, &createOpt)
if err != nil {
return diag.Errorf("error producing kafka topic message: %s", err)
}

id, err := uuid.GenerateUUID()
if err != nil {
return diag.Errorf("unable to generate ID: %s", err)
}
d.SetId(id)

return nil
}

func buildCreateKafkaMessageProduceBodyParams(d *schema.ResourceData) map[string]interface{} {
bodyParams := map[string]interface{}{
"topic": d.Get("topic"),
"body": d.Get("body"),
"property_list": buildCreateMessageBodyParamsPropertyList(d.Get("property_list").([]interface{})),
}
return bodyParams
}

func buildCreateMessageBodyParamsPropertyList(rawParams []interface{}) []map[string]interface{} {
if len(rawParams) == 0 {
return nil
}
rst := make([]map[string]interface{}, 0, len(rawParams))
for _, val := range rawParams {
raw := val.(map[string]interface{})
params := map[string]interface{}{
"name": raw["name"],
"value": raw["value"],
}
rst = append(rst, params)
}

return rst
}

func resourceDmsKafkaMessageProduceRead(_ context.Context, _ *schema.ResourceData, _ interface{}) diag.Diagnostics {
return nil
}

func resourceDmsKafkaMessageProduceDelete(_ context.Context, _ *schema.ResourceData, _ interface{}) diag.Diagnostics {
errorMsg := "Deleting resource is not supported. The resource is only removed from the state, the message remains in the cloud."
return diag.Diagnostics{
diag.Diagnostic{
Severity: diag.Warning,
Summary: errorMsg,
},
}
}

0 comments on commit 7a70db5

Please sign in to comment.