4 Commits

Author SHA1 Message Date
f2966b372e Add LowCardinality host/measurement
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-27 09:40:25 +03:00
7b2191d2ea Update README
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-27 09:12:46 +03:00
97e6b8e124 Update module path
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 15:39:45 +03:00
7c9a6cc6e0 Optimize NULL handling
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 14:35:11 +03:00
6 changed files with 31 additions and 38 deletions

21
.gitignore vendored
View File

@@ -1,21 +0,0 @@
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work

View File

@@ -1,7 +1,5 @@
.PHONY: build .PHONY: build
COMMIT_HASH=$(shell git rev-parse --short HEAD)
build: build:
CGOENABLED=false GOOS=linux GOARCH=amd64 go build -o clickhouse_${COMMIT_HASH}_amd64 cmd/main.go CGOENABLED=false GOOS=linux GOARCH=amd64 go build -o telegraf-clickhouse-plugin-linux-amd64 cmd/main.go
CGOENABLED=false GOOS=linux GOARCH=arm64 go build -o clickhouse_${COMMIT_HASH}_arm64 cmd/main.go CGOENABLED=false GOOS=linux GOARCH=arm64 go build -o telegraf-clickhouse-plugin-linux-arm64 cmd/main.go

View File

@@ -4,6 +4,22 @@ The ClickHouse output plugin saves Telegraf metric data to a ClickHouse database
The plugin uses Golang's generic "database/sql" interface and third party drivers. The plugin uses Golang's generic "database/sql" interface and third party drivers.
## Build
Dependencies:
* [Go](https://golang.org/doc/install)
* [Make](https://www.gnu.org/software/make/)
To build the plugin, run `make` in the root directory of the project.
Alternatively, you can build the plugin using Docker:
```bash
docker run --rm -v $(pwd):/telegraf -w /telegraf -e CGOENABLED=false -e GOOS=linux -e GOARCH=amd64 golang go build -o telegraf-clickhouse-plugin-linux-amd64 cmd/main.go
docker run --rm -v $(pwd):/telegraf -w /telegraf -e CGOENABLED=false -e GOOS=linux -e GOARCH=arm64 golang go build -o telegraf-clickhouse-plugin-linux-arm64 cmd/main.go
```
## Getting started ## Getting started
To use the plugin, set the data source name (DSN). The user account must have privileges to insert rows and create tables. To use the plugin, set the data source name (DSN). The user account must have privileges to insert rows and create tables.

View File

@@ -6,7 +6,7 @@ import (
"os" "os"
"time" "time"
_ "github.com/karaolidis/telegraf-clickhouse-plugin/plugins/outputs/clickhouse" _ "git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/plugins/outputs/clickhouse"
"github.com/influxdata/telegraf/plugins/common/shim" "github.com/influxdata/telegraf/plugins/common/shim"
) )

2
go.mod
View File

@@ -1,4 +1,4 @@
module github.com/karaolidis/telegraf-clickhouse-plugin module git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin
go 1.20 go 1.20

View File

@@ -158,11 +158,12 @@ func (ch *ClickHouse) toDatatype(value interface{}) string {
return datatype return datatype
} }
func (ch *ClickHouse) toNullable(pair *orderedmap.Pair[string, string]) string { func (ch *ClickHouse) toNullableDatatype(key string, value interface{}) string {
if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { datatype := ch.toDatatype(value)
return fmt.Sprintf("Nullable(%s)", pair.Value) if key == "host" || key == "measurement" {
return fmt.Sprintf("LowCardinality(%s)", datatype)
} }
return pair.Value return fmt.Sprintf("Nullable(%s)", datatype)
} }
func (ch *ClickHouse) pepareMetrics(metrics []telegraf.Metric, metricsData map[string][]map[string]interface{}, columns map[string]*orderedmap.OrderedMap[string, string]) { func (ch *ClickHouse) pepareMetrics(metrics []telegraf.Metric, metricsData map[string][]map[string]interface{}, columns map[string]*orderedmap.OrderedMap[string, string]) {
@@ -188,12 +189,12 @@ func (ch *ClickHouse) pepareMetrics(metrics []telegraf.Metric, metricsData map[s
for _, tag := range metric.TagList() { for _, tag := range metric.TagList() {
metricEntry[tag.Key] = tag.Value metricEntry[tag.Key] = tag.Value
columns[tablename].Set(tag.Key, ch.toDatatype(tag.Value)) columns[tablename].Set(tag.Key, ch.toNullableDatatype(tag.Key, tag.Value))
} }
for _, field := range metric.FieldList() { for _, field := range metric.FieldList() {
metricEntry[field.Key] = field.Value metricEntry[field.Key] = field.Value
columns[tablename].Set(field.Key, ch.toDatatype(field.Value)) columns[tablename].Set(field.Key, ch.toNullableDatatype(field.Key, field.Value))
} }
metricsData[tablename] = append(metricsData[tablename], metricEntry) metricsData[tablename] = append(metricsData[tablename], metricEntry)
@@ -203,7 +204,7 @@ func (ch *ClickHouse) pepareMetrics(metrics []telegraf.Metric, metricsData map[s
func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string {
columnDefs := make([]string, 0, columns.Len()) columnDefs := make([]string, 0, columns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), ch.toNullable(pair))) columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), pair.Value))
} }
orderBy := make([]string, 0, 2) orderBy := make([]string, 0, 2)
@@ -230,15 +231,14 @@ func (ch *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.O
modifyDefs := make([]string, 0, columns.Len()) modifyDefs := make([]string, 0, columns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
columnType := ch.toNullable(pair)
alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s",
quoteIdent(pair.Key), quoteIdent(pair.Key),
columnType)) pair.Value))
modifyDefs = append(modifyDefs, fmt.Sprintf("MODIFY COLUMN IF EXISTS %s %s", modifyDefs = append(modifyDefs, fmt.Sprintf("MODIFY COLUMN IF EXISTS %s %s",
quoteIdent(pair.Key), quoteIdent(pair.Key),
columnType)) pair.Value))
} }
return fmt.Sprintf("ALTER TABLE %s %s, %s", return fmt.Sprintf("ALTER TABLE %s %s, %s",
@@ -286,7 +286,7 @@ func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedM
} }
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
if _, ok := tableColumns[pair.Key]; !ok || tableColumns[pair.Key] != ch.toNullable(pair) { if _, ok := tableColumns[pair.Key]; !ok || tableColumns[pair.Key] != pair.Value {
_, err = ch.db.Exec(ch.generateAlterTable(tablename, columns)) _, err = ch.db.Exec(ch.generateAlterTable(tablename, columns))
if err != nil { if err != nil {
return err return err