6 Commits

Author SHA1 Message Date
f2966b372e Add LowCardinality host/measurement
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-27 09:40:25 +03:00
7b2191d2ea Update README
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-27 09:12:46 +03:00
97e6b8e124 Update module path
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 15:39:45 +03:00
7c9a6cc6e0 Optimize NULL handling
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 14:35:11 +03:00
318155561b Remove single-table mode
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 12:26:35 +03:00
bbefd86e13 Add .gitlab-ci.yml
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-04 09:44:54 +03:00
8 changed files with 195 additions and 247 deletions

22
.gitignore vendored
View File

@@ -1,22 +0,0 @@
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
clickhouse_*
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work

44
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,44 @@
stages:
- build
- release
build:
image: golang
stage: build
script:
- echo "Build Commit $CI_COMMIT_SHA"
- CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o telegraf-clickhouse-plugin-linux-amd64 cmd/main.go
- CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o telegraf-clickhouse-plugin-linux-arm64 cmd/main.go
after_script:
- echo "JOB_ID=$CI_JOB_ID" >> job.env
artifacts:
paths:
- telegraf-clickhouse-plugin-linux-amd64
- telegraf-clickhouse-plugin-linux-arm64
expire_in: 1d
reports:
dotenv: job.env
rules:
- if: "$CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH"
release:
image: registry.gitlab.com/gitlab-org/release-cli
stage: release
needs:
- job: build
artifacts: true
script:
- echo "Create Release $CI_COMMIT_SHA"
release:
name: "Release $CI_COMMIT_SHORT_SHA"
tag_name: "$CI_COMMIT_SHORT_SHA"
ref: "$CI_COMMIT_SHORT_SHA"
description: "Release $CI_COMMIT_SHORT_SHA"
assets:
links:
- name: "telegraf-clickhouse-plugin-linux-amd64"
url: "https://git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/-/jobs/$JOB_ID/artifacts/raw/telegraf-clickhouse-plugin-linux-amd64"
- name: "telegraf-clickhouse-plugin-linux-arm64"
url: "https://git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/-/jobs/$JOB_ID/artifacts/raw/telegraf-clickhouse-plugin-linux-arm64"
rules:
- if: "$CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH"

View File

@@ -1,7 +1,5 @@
.PHONY: build .PHONY: build
COMMIT_HASH=$(shell git rev-parse --short HEAD)
build: build:
CGOENABLED=false GOOS=linux GOARCH=amd64 go build -o clickhouse_${COMMIT_HASH}_amd64 cmd/main.go CGOENABLED=false GOOS=linux GOARCH=amd64 go build -o telegraf-clickhouse-plugin-linux-amd64 cmd/main.go
CGOENABLED=false GOOS=linux GOARCH=arm64 go build -o clickhouse_${COMMIT_HASH}_arm64 cmd/main.go CGOENABLED=false GOOS=linux GOARCH=arm64 go build -o telegraf-clickhouse-plugin-linux-arm64 cmd/main.go

View File

@@ -4,6 +4,22 @@ The ClickHouse output plugin saves Telegraf metric data to a ClickHouse database
The plugin uses Golang's generic "database/sql" interface and third party drivers. The plugin uses Golang's generic "database/sql" interface and third party drivers.
## Build
Dependencies:
* [Go](https://golang.org/doc/install)
* [Make](https://www.gnu.org/software/make/)
To build the plugin, run `make` in the root directory of the project.
Alternatively, you can build the plugin using Docker:
```bash
docker run --rm -v $(pwd):/telegraf -w /telegraf -e CGOENABLED=false -e GOOS=linux -e GOARCH=amd64 golang go build -o telegraf-clickhouse-plugin-linux-amd64 cmd/main.go
docker run --rm -v $(pwd):/telegraf -w /telegraf -e CGOENABLED=false -e GOOS=linux -e GOARCH=arm64 golang go build -o telegraf-clickhouse-plugin-linux-arm64 cmd/main.go
```
## Getting started ## Getting started
To use the plugin, set the data source name (DSN). The user account must have privileges to insert rows and create tables. To use the plugin, set the data source name (DSN). The user account must have privileges to insert rows and create tables.

View File

@@ -6,7 +6,7 @@ import (
"os" "os"
"time" "time"
_ "github.com/karaolidis/telegraf-clickhouse-plugin/plugins/outputs/clickhouse" _ "git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/plugins/outputs/clickhouse"
"github.com/influxdata/telegraf/plugins/common/shim" "github.com/influxdata/telegraf/plugins/common/shim"
) )

2
go.mod
View File

@@ -1,4 +1,4 @@
module github.com/karaolidis/telegraf-clickhouse-plugin module git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin
go 1.20 go 1.20

View File

@@ -20,22 +20,12 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type SingleTableOptions struct {
TableName string `toml:"table_name"`
}
type MultiTableOptions struct {
TablePrefix string `toml:"table_prefix"`
}
type ClickHouse struct { type ClickHouse struct {
DataSourceName string `toml:"data_source_name"` DataSourceName string `toml:"data_source_name"`
InitSQL string `toml:"init_sql"` InitSQL string `toml:"init_sql"`
TimestampColumn string `toml:"timestamp_column"` TimestampColumn string `toml:"timestamp_column"`
TTL string `toml:"ttl"` TTL string `toml:"ttl"`
TableMode string `toml:"table_mode"` TablePrefix string `toml:"table_prefix"`
SingleTableOptions SingleTableOptions `toml:"single_table"`
MultiTableOptions MultiTableOptions `toml:"multi_table"`
QueueInitialSize int `toml:"queue_initial_size"` QueueInitialSize int `toml:"queue_initial_size"`
QueueMaxSize int `toml:"queue_max_size"` QueueMaxSize int `toml:"queue_max_size"`
QueueFlushSize int `toml:"queue_flush_size"` QueueFlushSize int `toml:"queue_flush_size"`
@@ -66,18 +56,6 @@ func (ch *ClickHouse) Init() error {
ch.Log.Info("timestamp_column is not set, using default value: ", ch.TimestampColumn) ch.Log.Info("timestamp_column is not set, using default value: ", ch.TimestampColumn)
} }
if ch.TableMode == "" {
ch.TableMode = "multi"
ch.Log.Info("table_mode is not set, using default value: ", ch.TableMode)
} else if ch.TableMode != "single" && ch.TableMode != "multi" {
return fmt.Errorf("table_mode must be one of: single, multi")
}
if ch.TableMode == "single" && ch.SingleTableOptions.TableName == "" {
ch.SingleTableOptions.TableName = "telegraf"
ch.Log.Info("table_name is not set, using default value: ", ch.SingleTableOptions.TableName)
}
if ch.QueueInitialSize <= 0 { if ch.QueueInitialSize <= 0 {
ch.QueueInitialSize = 100000 ch.QueueInitialSize = 100000
ch.Log.Info("queue_initial_size is not set, using default value: ", ch.QueueInitialSize) ch.Log.Info("queue_initial_size is not set, using default value: ", ch.QueueInitialSize)
@@ -180,27 +158,60 @@ func (ch *ClickHouse) toDatatype(value interface{}) string {
return datatype return datatype
} }
func (ch *ClickHouse) toNullable(pair *orderedmap.Pair[string, string]) string { func (ch *ClickHouse) toNullableDatatype(key string, value interface{}) string {
if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { datatype := ch.toDatatype(value)
return fmt.Sprintf("Nullable(%s)", pair.Value) if key == "host" || key == "measurement" {
return fmt.Sprintf("LowCardinality(%s)", datatype)
}
return fmt.Sprintf("Nullable(%s)", datatype)
}
func (ch *ClickHouse) pepareMetrics(metrics []telegraf.Metric, metricsData map[string][]map[string]interface{}, columns map[string]*orderedmap.OrderedMap[string, string]) {
for _, metric := range metrics {
tablename := metric.Name()
if ch.TablePrefix != "" {
tablename = ch.TablePrefix + "_" + tablename
}
if _, ok := metricsData[tablename]; !ok {
metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics))
}
if _, ok := columns[tablename]; !ok {
columns[tablename] = orderedmap.New[string, string](len(metrics))
}
metricEntry := make(map[string]interface{})
metricEntry[ch.TimestampColumn] = metric.Time()
columns[tablename].Set(ch.TimestampColumn, ch.toDatatype(metric.Time()))
for _, tag := range metric.TagList() {
metricEntry[tag.Key] = tag.Value
columns[tablename].Set(tag.Key, ch.toNullableDatatype(tag.Key, tag.Value))
}
for _, field := range metric.FieldList() {
metricEntry[field.Key] = field.Value
columns[tablename].Set(field.Key, ch.toNullableDatatype(field.Key, field.Value))
}
metricsData[tablename] = append(metricsData[tablename], metricEntry)
} }
return pair.Value
} }
func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string {
columnDefs := make([]string, 0, columns.Len()) columnDefs := make([]string, 0, columns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), ch.toNullable(pair))) columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), pair.Value))
} }
orderBy := make([]string, 0, 3) orderBy := make([]string, 0, 2)
if _, ok := columns.Get("host"); ok { if _, ok := columns.Get("host"); ok {
orderBy = append(orderBy, "host") orderBy = append(orderBy, "host")
} }
orderBy = append(orderBy, quoteIdent(ch.TimestampColumn)) orderBy = append(orderBy, quoteIdent(ch.TimestampColumn))
if _, ok := columns.Get("measurement"); ok {
orderBy = append(orderBy, "measurement")
}
createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s) ENGINE = MergeTree() ORDER BY (%s) PARTITION BY toYYYYMM(%s)", createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s) ENGINE = MergeTree() ORDER BY (%s) PARTITION BY toYYYYMM(%s)",
quoteIdent(tablename), quoteIdent(tablename),
@@ -220,15 +231,14 @@ func (ch *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.O
modifyDefs := make([]string, 0, columns.Len()) modifyDefs := make([]string, 0, columns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
columnType := ch.toNullable(pair)
alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s",
quoteIdent(pair.Key), quoteIdent(pair.Key),
columnType)) pair.Value))
modifyDefs = append(modifyDefs, fmt.Sprintf("MODIFY COLUMN IF EXISTS %s %s", modifyDefs = append(modifyDefs, fmt.Sprintf("MODIFY COLUMN IF EXISTS %s %s",
quoteIdent(pair.Key), quoteIdent(pair.Key),
columnType)) pair.Value))
} }
return fmt.Sprintf("ALTER TABLE %s %s, %s", return fmt.Sprintf("ALTER TABLE %s %s, %s",
@@ -276,7 +286,7 @@ func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedM
} }
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
if _, ok := tableColumns[pair.Key]; !ok || tableColumns[pair.Key] != ch.toNullable(pair) { if _, ok := tableColumns[pair.Key]; !ok || tableColumns[pair.Key] != pair.Value {
_, err = ch.db.Exec(ch.generateAlterTable(tablename, columns)) _, err = ch.db.Exec(ch.generateAlterTable(tablename, columns))
if err != nil { if err != nil {
return err return err
@@ -304,13 +314,24 @@ func (ch *ClickHouse) generateInsert(tablename string, columns *orderedmap.Order
placeholders) placeholders)
} }
func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { func (ch *ClickHouse) writeToDB(metrics []telegraf.Metric) error {
err := ch.ensureTable(tablename, columns) metricsData := make(map[string][]map[string]interface{})
columns := make(map[string]*orderedmap.OrderedMap[string, string])
start := time.Now()
ch.pepareMetrics(metrics, metricsData, columns)
ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start))
start = time.Now()
for tablename, metrics := range metricsData {
tableColumns := columns[tablename]
err := ch.ensureTable(tablename, tableColumns)
if err != nil { if err != nil {
return err return err
} }
sql := ch.generateInsert(tablename, columns, len(metrics)) sql := ch.generateInsert(tablename, tableColumns, len(metrics))
tx, err := ch.db.Begin() tx, err := ch.db.Begin()
if err != nil { if err != nil {
@@ -326,8 +347,8 @@ func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.Ordered
values := make([][]interface{}, 0, len(metrics)) values := make([][]interface{}, 0, len(metrics))
for _, metric := range metrics { for _, metric := range metrics {
value := make([]interface{}, 0, columns.Len()) value := make([]interface{}, 0, tableColumns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := tableColumns.Oldest(); pair != nil; pair = pair.Next() {
value = append(value, metric[pair.Key]) value = append(value, metric[pair.Key])
} }
values = append(values, value) values = append(values, value)
@@ -344,100 +365,49 @@ func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.Ordered
if err != nil { if err != nil {
return fmt.Errorf("commit failed: %w", err) return fmt.Errorf("commit failed: %w", err)
} }
return nil
}
func (ch *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error {
metricsData := make(map[string][]map[string]interface{})
columns := make(map[string]*orderedmap.OrderedMap[string, string])
start := time.Now()
for _, metric := range metrics {
tablename := metric.Name()
if ch.MultiTableOptions.TablePrefix != "" {
tablename = ch.MultiTableOptions.TablePrefix + "_" + tablename
}
if _, ok := metricsData[tablename]; !ok {
metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics))
}
if _, ok := columns[tablename]; !ok {
columns[tablename] = orderedmap.New[string, string](len(metrics))
}
metricEntry := make(map[string]interface{})
metricEntry[ch.TimestampColumn] = metric.Time()
columns[tablename].Set(ch.TimestampColumn, ch.toDatatype(metric.Time()))
for _, tag := range metric.TagList() {
metricEntry[tag.Key] = tag.Value
columns[tablename].Set(tag.Key, ch.toDatatype(tag.Value))
}
for _, field := range metric.FieldList() {
metricEntry[field.Key] = field.Value
columns[tablename].Set(field.Key, ch.toDatatype(field.Value))
}
metricsData[tablename] = append(metricsData[tablename], metricEntry)
}
ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start))
start = time.Now()
for tablename, metrics := range metricsData {
err := ch.writeMetrics(tablename, columns[tablename], metrics)
if err != nil {
return err
}
} }
ch.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start)) ch.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start))
return nil return nil
} }
func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { func (ch *ClickHouse) backgroundWriter(delay time.Duration) {
tablename := ch.SingleTableOptions.TableName timer := time.NewTimer(delay)
metricsData := make([]map[string]interface{}, 0, len(metrics)) defer timer.Stop()
columns := orderedmap.New[string, string](len(metrics))
start := time.Now() for {
for _, metric := range metrics { select {
metricName := metric.Name() case <-timer.C:
ch.metricLock.RLock()
metrics := ch.metricQueue
ch.metricLock.RUnlock()
metricEntry := make(map[string]interface{}) if len(metrics) > 0 {
metricEntry[ch.TimestampColumn] = metric.Time() ch.metricLock.Lock()
columns.Set(ch.TimestampColumn, ch.toDatatype(metric.Time())) ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize)
ch.metricLock.Unlock()
metricEntry["measurement"] = metricName err := ch.writeToDB(metrics)
columns.Set("measurement", ch.toDatatype(metricName))
for _, tag := range metric.TagList() {
colName := fmt.Sprintf("%s_%s", metricName, tag.Key)
metricEntry[colName] = tag.Value
columns.Set(colName, ch.toDatatype(tag.Value))
}
for _, field := range metric.FieldList() {
colName := fmt.Sprintf("%s_%s", metricName, field.Key)
metricEntry[colName] = field.Value
columns.Set(colName, ch.toDatatype(field.Value))
}
metricsData = append(metricsData, metricEntry)
}
ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start))
start = time.Now()
err := ch.writeMetrics(tablename, columns, metricsData)
if err != nil { if err != nil {
return err ch.Log.Error("Error writing to ClickHouse: ", err)
}
} }
ch.Log.Infof("Wrote %d metrics to %s in %s\n", len(metrics), tablename, time.Since(start))
return nil timer.Reset(delay)
case <-ch.metricTrigger:
ch.metricLock.RLock()
metricsLength := len(ch.metricQueue)
ch.metricLock.RUnlock()
if metricsLength < ch.QueueFlushSize {
if !timer.Stop() {
<-timer.C
}
timer.Reset(delay)
}
}
}
} }
func (ch *ClickHouse) Write(metrics []telegraf.Metric) error { func (ch *ClickHouse) Write(metrics []telegraf.Metric) error {
@@ -462,52 +432,6 @@ func (ch *ClickHouse) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
func (ch *ClickHouse) backgroundWriter(delay time.Duration) {
timer := time.NewTimer(delay)
defer timer.Stop()
for {
select {
case <-timer.C:
ch.metricLock.RLock()
metrics := ch.metricQueue
ch.metricLock.RUnlock()
if len(metrics) > 0 {
ch.metricLock.Lock()
ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize)
ch.metricLock.Unlock()
if ch.TableMode == "single" {
err := ch.WriteSingleTable(metrics)
if err != nil {
ch.Log.Error("Error writing to ClickHouse: ", err)
}
} else {
err := ch.WriteMultiTable(metrics)
if err != nil {
ch.Log.Error("Error writing to ClickHouse: ", err)
}
}
}
timer.Reset(delay)
case <-ch.metricTrigger:
ch.metricLock.RLock()
metricsLength := len(ch.metricQueue)
ch.metricLock.RUnlock()
if metricsLength < ch.QueueFlushSize {
if !timer.Stop() {
<-timer.C
}
timer.Reset(delay)
}
}
}
}
func init() { func init() {
outputs.Add("clickhouse", func() telegraf.Output { outputs.Add("clickhouse", func() telegraf.Output {
return &ClickHouse{ return &ClickHouse{

View File

@@ -11,18 +11,6 @@
## Default TTL for data in the table (use ClickHouse syntax) ## Default TTL for data in the table (use ClickHouse syntax)
ttl = "3 MONTH" ttl = "3 MONTH"
## Table operation mode
## Set to "single" to create a single table for all metrics.
## Set to "multi" to create a new table for each metric.
# table_mode = "multi"
## Single table configuration
# [outputs.clickhouse.single_table]
## Table name
# table_name = "telegraf"
## Multi table configuration
# [outputs.clickhouse.multi_table]
## Table name prefix ## Table name prefix
# table_prefix = "telegraf" # table_prefix = "telegraf"