diff --git a/pkg/shared/clients/nats/client_pool_test.go b/pkg/shared/clients/nats/client_pool_test.go index 8b88b50850..426f0a0371 100644 --- a/pkg/shared/clients/nats/client_pool_test.go +++ b/pkg/shared/clients/nats/client_pool_test.go @@ -27,7 +27,7 @@ import ( ) func TestNewClientPool_Success(t *testing.T) { - os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222") + os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222") os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user") os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password") ctx := context.Background() @@ -39,7 +39,7 @@ func TestNewClientPool_Success(t *testing.T) { } func TestClientPool_NextAvailableClient(t *testing.T) { - os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222") + os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222") os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user") os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password") ctx := context.Background() @@ -58,7 +58,7 @@ func TestClientPool_NextAvailableClient(t *testing.T) { } func TestClientPool_CloseAll(t *testing.T) { - os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222") + os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222") os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user") os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password") ctx := context.Background() diff --git a/pkg/shared/clients/nats/nats_client_test.go b/pkg/shared/clients/nats/nats_client_test.go index 1162e38d3a..81646a5f88 100644 --- a/pkg/shared/clients/nats/nats_client_test.go +++ b/pkg/shared/clients/nats/nats_client_test.go @@ -32,7 +32,7 @@ import ( func TestNewNATSClient(t *testing.T) { // Setting up environment variables for the test - os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222") + os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222") os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user") os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password") defer os.Clearenv() diff --git a/rust/numaflow-core/src/config/pipeline.rs b/rust/numaflow-core/src/config/pipeline.rs index aa3921b272..0e0e70f625 100644 --- a/rust/numaflow-core/src/config/pipeline.rs +++ b/rust/numaflow-core/src/config/pipeline.rs @@ -479,7 +479,7 @@ mod tests { fn test_pipeline_config_load_sink_vertex() { let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLW91dCIsIm5hbWVzcGFjZSI6ImRlZmF1bHQiLCJjcmVhdGlvblRpbWVzdGFtcCI6bnVsbH0sInNwZWMiOnsibmFtZSI6Im91dCIsInNpbmsiOnsiYmxhY2tob2xlIjp7fSwicmV0cnlTdHJhdGVneSI6eyJvbkZhaWx1cmUiOiJyZXRyeSJ9fSwibGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjUwMCwicmVhZFRpbWVvdXQiOiIxcyIsImJ1ZmZlck1heExlbmd0aCI6MzAwMDAsImJ1ZmZlclVzYWdlTGltaXQiOjgwfSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJmcm9tRWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoib3V0IiwiY29uZGl0aW9ucyI6bnVsbCwiZnJvbVZlcnRleFR5cGUiOiJTb3VyY2UiLCJmcm9tVmVydGV4UGFydGl0aW9uQ291bnQiOjEsImZyb21WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJTaW5rIiwidG9WZXJ0ZXhQYXJ0aXRpb25Db3VudCI6MSwidG9WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9fV0sIndhdGVybWFyayI6eyJtYXhEZWxheSI6IjBzIn19LCJzdGF0dXMiOnsicGhhc2UiOiIiLCJyZXBsaWNhcyI6MCwiZGVzaXJlZFJlcGxpY2FzIjowLCJsYXN0U2NhbGVkQXQiOm51bGx9fQ==".to_string(); - let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")]; + let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")]; let pipeline_config = PipelineConfig::load(pipeline_cfg_base64, env_vars).unwrap(); let expected = PipelineConfig { @@ -490,7 +490,7 @@ mod tests { paf_concurrency: 1000, read_timeout: Duration::from_secs(1), js_client_config: isb::jetstream::ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }, @@ -526,7 +526,7 @@ mod tests { fn test_pipeline_config_load_all() { let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLWluIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJuYW1lIjoiaW4iLCJzb3VyY2UiOnsiZ2VuZXJhdG9yIjp7InJwdSI6MTAwMDAwLCJkdXJhdGlvbiI6IjFzIiwibXNnU2l6ZSI6OCwiaml0dGVyIjoiMHMifX0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImVudiI6W3sibmFtZSI6IlBBRl9CQVRDSF9TSVpFIiwidmFsdWUiOiIxMDAwMDAifV19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6MTAwMCwicmVhZFRpbWVvdXQiOiIxcyIsImJ1ZmZlck1heExlbmd0aCI6MTUwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4NX0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fSwicGlwZWxpbmVOYW1lIjoic2ltcGxlLXBpcGVsaW5lIiwiaW50ZXJTdGVwQnVmZmVyU2VydmljZU5hbWUiOiIiLCJyZXBsaWNhcyI6MCwidG9FZGdlcyI6W3siZnJvbSI6ImluIiwidG8iOiJvdXQiLCJjb25kaXRpb25zIjpudWxsLCJmcm9tVmVydGV4VHlwZSI6IlNvdXJjZSIsImZyb21WZXJ0ZXhQYXJ0aXRpb25Db3VudCI6MSwiZnJvbVZlcnRleExpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjoxMDAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjoxNTAwMDAsImJ1ZmZlclVzYWdlTGltaXQiOjg1fSwidG9WZXJ0ZXhUeXBlIjoiU2luayIsInRvVmVydGV4UGFydGl0aW9uQ291bnQiOjEsInRvVmVydGV4TGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjEwMDAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjE1MDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODV9fV0sIndhdGVybWFyayI6eyJkaXNhYmxlZCI6dHJ1ZSwibWF4RGVsYXkiOiIwcyJ9fSwic3RhdHVzIjp7InBoYXNlIjoiIiwicmVwbGljYXMiOjAsImRlc2lyZWRSZXBsaWNhcyI6MCwibGFzdFNjYWxlZEF0IjpudWxsfX0="; - let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")]; + let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")]; let pipeline_config = PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap(); @@ -538,7 +538,7 @@ mod tests { paf_concurrency: 1000, read_timeout: Duration::from_secs(1), js_client_config: isb::jetstream::ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }, @@ -579,7 +579,7 @@ mod tests { fn test_pipeline_config_pulsar_source() { let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLWluIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJuYW1lIjoiaW4iLCJzb3VyY2UiOnsicHVsc2FyIjp7InNlcnZlckFkZHIiOiJwdWxzYXI6Ly9wdWxzYXItc2VydmljZTo2NjUwIiwidG9waWMiOiJ0ZXN0X3BlcnNpc3RlbnQiLCJjb25zdW1lck5hbWUiOiJteV9wZXJzaXN0ZW50X2NvbnN1bWVyIiwic3Vic2NyaXB0aW9uTmFtZSI6Im15X3BlcnNpc3RlbnRfc3Vic2NyaXB0aW9uIn19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH0sInNjYWxlIjp7Im1pbiI6MSwibWF4IjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJ0b0VkZ2VzIjpbeyJmcm9tIjoiaW4iLCJ0byI6Im91dCIsImNvbmRpdGlvbnMiOm51bGwsImZyb21WZXJ0ZXhUeXBlIjoiU291cmNlIiwiZnJvbVZlcnRleFBhcnRpdGlvbkNvdW50IjoxLCJmcm9tVmVydGV4TGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjUwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJTaW5rIiwidG9WZXJ0ZXhQYXJ0aXRpb25Db3VudCI6MSwidG9WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH19XSwid2F0ZXJtYXJrIjp7Im1heERlbGF5IjoiMHMifX0sInN0YXR1cyI6eyJwaGFzZSI6IiIsInJlcGxpY2FzIjowLCJkZXNpcmVkUmVwbGljYXMiOjAsImxhc3RTY2FsZWRBdCI6bnVsbH19"; - let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")]; + let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")]; let pipeline_config = PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap(); @@ -591,7 +591,7 @@ mod tests { paf_concurrency: 1000, read_timeout: Duration::from_secs(1), js_client_config: isb::jetstream::ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }, @@ -702,7 +702,7 @@ mod tests { fn test_pipeline_config_load_map_vertex() { let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLW1hcCIsIm5hbWVzcGFjZSI6ImRlZmF1bHQiLCJjcmVhdGlvblRpbWVzdGFtcCI6bnVsbH0sInNwZWMiOnsibmFtZSI6Im1hcCIsInVkZiI6eyJjb250YWluZXIiOnsidGVtcGxhdGUiOiJkZWZhdWx0In19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJzY2FsZSI6eyJtaW4iOjF9LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJmcm9tRWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoibWFwIiwiY29uZGl0aW9ucyI6bnVsbCwiZnJvbVZlcnRleFR5cGUiOiJTb3VyY2UiLCJmcm9tVmVydGV4UGFydGl0aW9uQ291bnQiOjEsImZyb21WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJNYXAiLCJ0b1ZlcnRleFBhcnRpdGlvbkNvdW50IjoxLCJ0b1ZlcnRleExpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH19XSwid2F0ZXJtYXJrIjp7Im1heERlbGF5IjoiMHMifX0sInN0YXR1cyI6eyJwaGFzZSI6IiIsInJlcGxpY2FzIjowLCJkZXNpcmVkUmVwbGljYXMiOjAsImxhc3RTY2FsZWRBdCI6bnVsbH19"; - let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")]; + let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")]; let pipeline_config = PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap(); @@ -714,7 +714,7 @@ mod tests { paf_concurrency: 1000, read_timeout: Duration::from_secs(1), js_client_config: isb::jetstream::ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }, diff --git a/rust/numaflow-core/src/config/pipeline/isb.rs b/rust/numaflow-core/src/config/pipeline/isb.rs index 50da134e87..806060e289 100644 --- a/rust/numaflow-core/src/config/pipeline/isb.rs +++ b/rust/numaflow-core/src/config/pipeline/isb.rs @@ -10,7 +10,7 @@ const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::Ret const DEFAULT_WIP_ACK_INTERVAL_MILLIS: u64 = 1000; pub(crate) mod jetstream { - const DEFAULT_URL: &str = "localhost:4222"; + const DEFAULT_URL: &str = "nats:4222"; #[derive(Debug, Clone, PartialEq)] pub(crate) struct ClientConfig { pub url: String, @@ -102,7 +102,7 @@ mod jetstream_client_config { #[test] fn test_default_client_config() { let expected_config = ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }; diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index 652f1c8181..73c00f9df4 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -397,7 +397,7 @@ mod tests { "default-test-forwarder-for-source-vertex-out-4", ]; - let js_url = "localhost:4222"; + let js_url = "nats:4222"; let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -442,7 +442,7 @@ mod tests { paf_concurrency: 30000, read_timeout: Duration::from_secs(1), js_client_config: isb::jetstream::ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }, @@ -539,7 +539,7 @@ mod tests { "default-test-forwarder-for-sink-vertex-out-4", ]; - let js_url = "localhost:4222"; + let js_url = "nats:4222"; let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -611,7 +611,7 @@ mod tests { paf_concurrency: 1000, read_timeout: Duration::from_secs(1), js_client_config: isb::jetstream::ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }, @@ -736,7 +736,7 @@ mod tests { "default-test-forwarder-for-map-vertex-out-4", ]; - let js_url = "localhost:4222"; + let js_url = "nats:4222"; let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -840,7 +840,7 @@ mod tests { paf_concurrency: 1000, read_timeout: Duration::from_secs(1), js_client_config: isb::jetstream::ClientConfig { - url: "localhost:4222".to_string(), + url: "nats:4222".to_string(), user: None, password: None, }, diff --git a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs index a14ab7b4c8..9fe6de8805 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs @@ -229,7 +229,7 @@ mod tests { ); // create a js writer - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 97aa5f9864..588b177cc5 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -301,7 +301,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_jetstream_read() { - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -398,7 +398,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_jetstream_ack() { - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index 063af6391a..7a26adaf95 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -484,7 +484,7 @@ mod tests { async fn test_async_write() { let tracker_handle = TrackerHandle::new(None); let cln_token = CancellationToken::new(); - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -559,7 +559,7 @@ mod tests { #[tokio::test] async fn test_sync_write() { let cln_token = CancellationToken::new(); - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -623,7 +623,7 @@ mod tests { #[tokio::test] async fn test_write_with_cancellation() { let tracker_handle = TrackerHandle::new(None); - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -747,7 +747,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_fetch_buffer_usage() { - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -823,7 +823,7 @@ mod tests { #[tokio::test] async fn test_check_stream_status() { let tracker_handle = TrackerHandle::new(None); - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -916,7 +916,7 @@ mod tests { #[tokio::test] async fn test_streaming_write() { let cln_token = CancellationToken::new(); - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -1002,7 +1002,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_streaming_write_with_cancellation() { - let js_url = "localhost:4222"; + let js_url = "nats:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); @@ -1118,7 +1118,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_streaming_write_multiple_streams_vertices() { - let js_url = "localhost:4222"; + let js_url = "nats:4222"; let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); let tracker_handle = TrackerHandle::new(None);