This is a data plane SDK (it is for interacting with Azure Data Explorer service). For the control plane (resource administration), go here
- Fixed issue with queued ingestion to other clouds
-
[BREAKING] - The minimal go version is now 1.19
-
[BREAKING] - Moving to a connection-string based approach to creating and authenticating clients.
This change aligns the go SDK with the others, and gives the option to re-use connection strings between SDKs.
As part of this change use of go-autorest based authentication is deprecated in favor of Azure Identity.To initialize a client:
// OLD WAY - REMOVED
authConfig := auth.NewClientCredentialsConfig("clientID", "clientSecret", "tenantID")
client, err := kusto.New("endpoint", kusto.Authorization{Config: authConfig})
// NEW WAY
kcsb := kusto.NewConnectionStringBuilder(`endpoint`).WithAadAppKey("clientID", "clientSecret", "tenentID")
client, err := kusto.New(kcsb)
-
[BREAKING] - Upgraded the azblob library to 0.6.1 This solves compatibility issues with other libraries, but might cause errors to those who still depend on the old version.
-
Implicit cloud detection.
-
All of our operations now share the same HTTP client inside the kusto client object.
Using the optionWithHttpClient
will use the passed http client for all of the SDKs request, granting support for configuring proxies and other HTTP related settings. -
Fixed various goroutine leaks. Now there are automatic tests to make sure we are not leaking resources.
-
Fetching ingestion resources is now done more consistently, without blocking the user.
-
Removed the header caching mechanism from streaming ingestion, as it was using a lot of memory for no major benefit.
- Setting a mapping now implies the ingestion format
- Fixed possible context race com/Azure/pull/134
- Json parsing errors now display the failed json string
- E2E tests require fewer prerequisites
- Deprecate AllowWrite - now it is the default like in other SDKs.
- Remove mutex from query client. Now queries can run in parallel, achieving much better performance.
- Fix Column.Type assignment. Was using string, now using types. by @jesseward
- Lint and test fixes
- Added
Application
andUser
asClientRequestProperties
to set thex-ms-app
andx-ms-user
headers, and the matching fields in.show queries
.
- Add all missing client request properties, and the ability to use custom ones using
CustomQueryOption
- Add the option to not parse the response when querying, but to receieve the json directly -
QueryToJson
- Various lint fixes and code improvements
- Make clients closeable
- Support port in http host
- Add retry mechanism for throttled requests
- Added custom http options for all clients
Ingestion.Stream
has been deprecated in favor of dedicated streaming clients -ingest.Streaming
andingest.Managed
. This API was very limited - it required you to create a queued ingestion client, it only accepted a byte array, and had no customization options.RowIterator.Next
andRowIterator.Do
are now deprecated and replaced byRowIterator.NextRowOrError
andRowIterator.DoOnRowOrError
. In previous versions, when encountering an error in-line with the results (also known as partial success), the SDK panicked. NowRowIterator.Next
andRowIterator.Do
will return the first error they encounter, including in-line errors or partials successes and finish. This means that there could be extra data that will be skipped when using these APIs. Fixed #81
ingest.Streaming
andingest.Managed
were added to the SDK. Their interface is identical toingest.Ingestion
(in fact - they all share an interfaceIngestor
), and are created viaingest.NewStreaming
andingest.NewManaged
respectively.ingest.Streaming
uses streaming ingestion to ingest data to kusto. It supports ingesting from a file or aReader
, but not a blob.ingest.Managed
is a managed streaming ingest client. It will try to use streaming ingest, but on transient failures and other conditions will fall back to queued ingestion.
- As mentioned before, RowIterator.Next
and
RowIterator.Doare now deprecated and replaced by
RowIterator.NextRowOrErrorand
RowIterator.DoOnRowOrError. The new APIs will act the same as the old, with the following changes:
RowIterator.NextRowOrErrorwill return an additional
inlineErrorvalue, when it's non-nil it indicates an inline error. After encountering it, the iteration doesn't end and you should continue consuming the iterator.
RowIterator.DoOnRowOrErrorrequires an additional parameter of type
*errors.Error`, which indicates an inline error. It can be ignored or handled, but while returning nil the iteration will continue.
- Support extracting non-primary tables from a
RowIterator
using the following methods -GetNonPrimary
,GetExtendedProperties
andGetQueryCompletionInformation
. Fixed #85 - Expose
TableFragmentType
via a Replace flag by @w1ndy in https://github.com/SilverdewBaker/azure-kusto-go/pull/74 - Refactor value converters and implement
ExtractValues
forRow
by @w1ndy in https://github.com/SilverdewBaker/azure-kusto-go/pull/75 - Better Dynamic converter by @w1ndy in https://github.com/SilverdewBaker/azure-kusto-go/pull/78
- Support more forms of decimal type, and accept input of big.Int for it. Fixed #86
- Add support for gzipped errors,. Fixed #84
- Moved from the old deprecated azblob to the new supported one. This should solve some issues in uploading blobs, specifically memory leaks.
- Added go-fmt gate check by @AsafMah in https://github.com/SilverdewBaker/azure-kusto-go/pull/77
- Parallelized tests and made them more robust
- Critical bug - When ingesting to multiple clusters all data is sent to one cluster. As always, we recommend re-using clients and ingestors whenever possible.
go get github.com/SilverdewBaker/azure-kusto-go/kusto
- go version 1.16
Below are some simple examples to get users up and running quickly. For full examples, please refer to the GoDoc for the packages.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID),
}
This creates a Kusto Authorizer using your client identity, secret and tenant identity. You may also uses other forms of authorization, please see the Authorization type in the GoDoc for more.
client, err := kusto.New(endpoint, authorizer)
if err != nil {
panic("add error handling")
}
// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()
endpoint represents the Kusto endpoint. This will resemble: "https://..kusto.windows.net".
The Kusto package package queries data into a *table.Row which can be printed or have the column data extracted.
// table package is: github.com/SilverdewBaker/azure-kusto-go/kusto/data/table
// Query our database table "systemNodes" for the CollectionTimes and the NodeIds.
iter, err := client.Query(ctx, "database", kusto.NewStmt("systemNodes | project CollectionTime, NodeId"))
if err != nil {
panic("add error handling")
}
defer iter.Stop()
// .Do() will call the function for every row in the table.
err = iter.Do(
func(row *table.Row) error {
if row.Replace {
fmt.Println("---") // Replace flag indicates that the query result should be cleared and replaced with this row
}
fmt.Println(row) // As a convenience, printing a *table.Row will output csv
return nil
},
)
if err != nil {
panic("add error handling")
}
Users will often want to turn the returned data into Go structs that are easier to work with. The *table.Row object that is returned supports this via the .ToStruct() method.
// NodeRec represents our Kusto data that will be returned.
type NodeRec struct {
// ID is the table's NodeId. We use the field tag here to to instruct our client to convert NodeId to ID.
ID int64 `kusto:"NodeId"`
// CollectionTime is Go representation of the Kusto datetime type.
CollectionTime time.Time
}
iter, err := client.Query(ctx, "database", kusto.NewStmt("systemNodes | project CollectionTime, NodeId"))
if err != nil {
panic("add error handling")
}
defer iter.Stop()
recs := []NodeRec{}
err = iter.Do(
func(row *table.Row) error {
rec := NodeRec{}
if err := row.ToStruct(&rec); err != nil {
return err
}
if row.Replace {
recs = recs[:0] // Replace flag indicates that the query result should be cleared and replaced with this row
}
recs = append(recs, rec)
return nil
},
)
if err != nil {
panic("add error handling")
}
The ingest/ package provides access to Kusto's ingestion service for importing data into Kusto. This requires some prerequisite knowledge of acceptable data formats, mapping references, ...
That documentation can be found here
Kusto's ingestion service makes no guarantees on when the data will show up in the table and is optimized for large chunks of data and not small uploads at a high rate.
If ingesting data from memory, it is suggested that you stream the data in via FromReader() passing in the reader from an io.Pipe(). The data will not begin ingestion until the writer closes.
Setup is quite simple, simply pass a *kusto.Client, the name of the database and table you wish to ingest into.
in, err := ingest.New(kustoClient, "database", "table")
if err != nil {
panic("add error handling")
}
// Be sure to close the ingestor when you're done. (Error handling omitted for brevity.)
defer in.Close()
Ingesting a local file requires simply passing the path to the file to be ingested:
if _, err := in.FromFile(ctx, "/path/to/a/local/file"); err != nil {
panic("add error handling")
}
FromFile() will accept Unix path names on Unix platforms and Windows path names on Windows platforms. The file will not be deleted after upload (there is an option that will allow that though).
This package will also accept ingestion from an Azure Blob Storage file:
if _, err := in.FromFile(ctx, "https://myaccount.blob.core.windows.net/$root/myblob"); err != nil {
panic("add error handling")
}
This will ingest a file from Azure Blob Storage. We only support https:// paths and your domain name may differ than what is here.
Sometimes you want to ingest a stream of data that you have in memory without writing to disk. You can do this simply by chunking the data via an io.Reader.
r, w := io.Pipe()
enc := json.NewEncoder(w)
go func() {
defer w.Close()
for _, data := range dataSet {
if err := enc.Encode(data); err != nil {
panic("add error handling")
}
}
}()
if _, err := in.FromReader(ctx, r); err != nil {
panic("add error handling")
}
It is important to remember that FromReader() will terminate when it receives an io.EOF from the io.Reader. Use io.Readers that won't return io.EOF until the io.Writer is closed (such as io.Pipe).
Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(ctx, jsonEncodedData, ingest.JSON, "mappingName"); err != nil {
panic("add error handling")
}
See the SDK best practices guide, which though written for the .NET SDK, applies similarly here.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.