Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

There's has no way to cancel a running query using context's cancel functions. #1388

Open
9 tasks
tinybit opened this issue Aug 27, 2024 · 1 comment
Open
9 tasks
Assignees
Labels

Comments

@tinybit
Copy link
Collaborator

tinybit commented Aug 27, 2024

Observed

Suppose we have a query that takes more than 10s to run.

Example 1: query will timeout after 10s, even though cancel() is called after 5s.

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

go func() {
   <...>
   err = conn.Exec(ctx, query)
   <...>
}

go func() {
    time.Sleep(5 * time.Second)
    cancel()
}()

Example 2: query will timeout after 5m, even though cancel() is called after 5s.

ctx, cancel := context.WithCancel(context.Background())

go func() {
   <...>
   err = conn.Exec(ctx, query)
   <...>
}

go func() {
    time.Sleep(5 * time.Second)
    cancel()
}()

There's full code example below. It outputs:

2024/08/29 15:51:19 Connected.
2024/08/29 15:51:26 Running heavy query...
2024/08/29 15:51:27 Context cancelled after 1s.
2024/08/29 15:51:32 Query took: 6.051290167s
2024/08/29 15:51:32 Done.

Note, that query execution takes 6+ seconds, when it should be around 1s, as per context cancellation request.

Expected behaviour

Stop query execution immediately after context was cancelled.

Results after patch applied, note difference 1s vs 1.001692375s:

2024/08/29 15:58:16 Connected.
2024/08/29 15:58:22 Running heavy query...
2024/08/29 15:58:23 Context cancelled after 1s.
2024/08/29 15:58:23 Finished with error: context canceled
2024/08/29 15:58:23 Query took: 1.001692375s
2024/08/29 15:58:23 Done.

Code example

package main

import (
	"context"
	"crypto/tls"
	"log"
	"time"

	"github.com/ClickHouse/clickhouse-go/v2"
)

var (
	q1 = "CREATE DATABASE IF NOT EXISTS test_query_cancellation"
	q2 = "DROP TABLE IF EXISTS test_query_cancellation.trips"
	q3 = `CREATE TABLE test_query_cancellation.trips (
		trip_id             UInt32,
		pickup_datetime     DateTime,
		dropoff_datetime    DateTime,
		pickup_longitude    Nullable(Float64),
		pickup_latitude     Nullable(Float64),
		dropoff_longitude   Nullable(Float64),
		dropoff_latitude    Nullable(Float64),
		passenger_count     UInt8,
		trip_distance       Float32,
		fare_amount         Float32,
		extra               Float32,
		tip_amount          Float32,
		tolls_amount        Float32,
		total_amount        Float32,
		payment_type        Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
		pickup_ntaname      LowCardinality(String),
		dropoff_ntaname     LowCardinality(String)
	)
	ENGINE = MergeTree
	PRIMARY KEY (pickup_datetime, dropoff_datetime);`
	q4 = `INSERT INTO test_query_cancellation.trips
	SELECT
		trip_id,
		pickup_datetime,
		dropoff_datetime,
		pickup_longitude,
		pickup_latitude,
		dropoff_longitude,
		dropoff_latitude,
		passenger_count,
		trip_distance,
		fare_amount,
		extra,
		tip_amount,
		tolls_amount,
		total_amount,
		payment_type,
		pickup_ntaname,
		dropoff_ntaname
	FROM s3(
		'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{0..2}.gz',
		'TabSeparatedWithNames'
	);`
)

func main() {
	prepareQueries := []string{q1, q2, q3, q4}

	// connect
	options := &clickhouse.Options{
		Addr: []string{"o2q8185ekk.us-east-2.aws.clickhouse-staging.com:9440"},
		Auth: clickhouse.Auth{
			Database: "default",
			Username: "default",
			Password: "pg0H6o0ikVyJFbu9KgYKXTII1u5HHdLLsvCg18MI7rSsf4ukLFUZcNzMoMjFfQaY",
		},
		Compression: &clickhouse.Compression{
			Method: clickhouse.CompressionLZ4,
		},
		TLS: &tls.Config{
			InsecureSkipVerify: false,
		},
		MaxOpenConns: 1,
		MaxIdleConns: 1,
	}

	conn, err := clickhouse.Open(options)
	if err != nil {
		return
	}

	if err = conn.Ping(context.Background()); err != nil {
		return
	}

	log.Println("Connected.")

	// prepare table
	for _, query := range prepareQueries {
		err = conn.Exec(context.Background(), query)
		if err != nil {
			log.Printf("Finished with error: %v\n", err)
			conn.Close()
			return
		}
	}

	// prepare context
	ctx, cancelCtx := context.WithCancel(context.Background())
	defer cancelCtx()

	doneCh := make(chan bool, 1)

	// run query in background
	go func() {
		log.Println("Running heavy query...")

		start := time.Now()

		defer func() {
			log.Printf("Query took: %v\n", time.Since(start))
			doneCh <- true
		}()

		err = conn.Exec(ctx, "OPTIMIZE TABLE test_query_cancellation.trips FINAL")
		if err != nil {
			log.Printf("Finished with error: %v\n", err)
			return
		}
	}()

	// let workers run for awhile and stop
	go func() {
		cancelBackoff := 1 * time.Second
		time.Sleep(cancelBackoff)
		cancelCtx()
		log.Printf("Context cancelled after %v.", cancelBackoff)
	}()

	<-doneCh

	conn.Close()
	log.Println("Done.")
}

Details

Environment

  • clickhouse-go version: v2.28.1
  • Interface: ClickHouse API / database/sql compatible driver
  • Go version: go1.21, go1.22, go1.23
  • Operating system: MacOS
  • ClickHouse version: 24.6.1.4287
  • Is it a ClickHouse Cloud? Y
  • ClickHouse Server non-default settings, if any:
  • CREATE TABLE statements for tables involved: 0
  • Sample data for all these tables, use clickhouse-obfuscator if necessary
@zdyj3170101136
Copy link

Before the modification, clickhouse.conn and *sql.DB could not control the timeout through context.Cancel?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants
@jkaflik @tinybit @zdyj3170101136 and others