5 Commits

Author SHA1 Message Date
f2966b372e Add LowCardinality host/measurement
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-27 09:40:25 +03:00
7b2191d2ea Update README
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-27 09:12:46 +03:00
97e6b8e124 Update module path
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 15:39:45 +03:00
7c9a6cc6e0 Optimize NULL handling
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 14:35:11 +03:00
318155561b Remove single-table mode
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-07-06 12:26:35 +03:00
8 changed files with 155 additions and 247 deletions

22
.gitignore vendored
View File

@@ -1,22 +0,0 @@
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
clickhouse_*
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work

View File

@@ -18,6 +18,8 @@ build:
expire_in: 1d expire_in: 1d
reports: reports:
dotenv: job.env dotenv: job.env
rules:
- if: "$CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH"
release: release:
image: registry.gitlab.com/gitlab-org/release-cli image: registry.gitlab.com/gitlab-org/release-cli
@@ -38,3 +40,5 @@ release:
url: "https://git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/-/jobs/$JOB_ID/artifacts/raw/telegraf-clickhouse-plugin-linux-amd64" url: "https://git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/-/jobs/$JOB_ID/artifacts/raw/telegraf-clickhouse-plugin-linux-amd64"
- name: "telegraf-clickhouse-plugin-linux-arm64" - name: "telegraf-clickhouse-plugin-linux-arm64"
url: "https://git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/-/jobs/$JOB_ID/artifacts/raw/telegraf-clickhouse-plugin-linux-arm64" url: "https://git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/-/jobs/$JOB_ID/artifacts/raw/telegraf-clickhouse-plugin-linux-arm64"
rules:
- if: "$CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH"

View File

@@ -1,7 +1,5 @@
.PHONY: build .PHONY: build
COMMIT_HASH=$(shell git rev-parse --short HEAD)
build: build:
CGOENABLED=false GOOS=linux GOARCH=amd64 go build -o clickhouse_${COMMIT_HASH}_amd64 cmd/main.go CGOENABLED=false GOOS=linux GOARCH=amd64 go build -o telegraf-clickhouse-plugin-linux-amd64 cmd/main.go
CGOENABLED=false GOOS=linux GOARCH=arm64 go build -o clickhouse_${COMMIT_HASH}_arm64 cmd/main.go CGOENABLED=false GOOS=linux GOARCH=arm64 go build -o telegraf-clickhouse-plugin-linux-arm64 cmd/main.go

View File

@@ -4,6 +4,22 @@ The ClickHouse output plugin saves Telegraf metric data to a ClickHouse database
The plugin uses Golang's generic "database/sql" interface and third party drivers. The plugin uses Golang's generic "database/sql" interface and third party drivers.
## Build
Dependencies:
* [Go](https://golang.org/doc/install)
* [Make](https://www.gnu.org/software/make/)
To build the plugin, run `make` in the root directory of the project.
Alternatively, you can build the plugin using Docker:
```bash
docker run --rm -v $(pwd):/telegraf -w /telegraf -e CGOENABLED=false -e GOOS=linux -e GOARCH=amd64 golang go build -o telegraf-clickhouse-plugin-linux-amd64 cmd/main.go
docker run --rm -v $(pwd):/telegraf -w /telegraf -e CGOENABLED=false -e GOOS=linux -e GOARCH=arm64 golang go build -o telegraf-clickhouse-plugin-linux-arm64 cmd/main.go
```
## Getting started ## Getting started
To use the plugin, set the data source name (DSN). The user account must have privileges to insert rows and create tables. To use the plugin, set the data source name (DSN). The user account must have privileges to insert rows and create tables.

View File

@@ -6,7 +6,7 @@ import (
"os" "os"
"time" "time"
_ "github.com/karaolidis/telegraf-clickhouse-plugin/plugins/outputs/clickhouse" _ "git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin/plugins/outputs/clickhouse"
"github.com/influxdata/telegraf/plugins/common/shim" "github.com/influxdata/telegraf/plugins/common/shim"
) )

2
go.mod
View File

@@ -1,4 +1,4 @@
module github.com/karaolidis/telegraf-clickhouse-plugin module git.karaolidis.com/karaolidis/telegraf-clickhouse-plugin
go 1.20 go 1.20

View File

@@ -20,30 +20,20 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type SingleTableOptions struct {
TableName string `toml:"table_name"`
}
type MultiTableOptions struct {
TablePrefix string `toml:"table_prefix"`
}
type ClickHouse struct { type ClickHouse struct {
DataSourceName string `toml:"data_source_name"` DataSourceName string `toml:"data_source_name"`
InitSQL string `toml:"init_sql"` InitSQL string `toml:"init_sql"`
TimestampColumn string `toml:"timestamp_column"` TimestampColumn string `toml:"timestamp_column"`
TTL string `toml:"ttl"` TTL string `toml:"ttl"`
TableMode string `toml:"table_mode"` TablePrefix string `toml:"table_prefix"`
SingleTableOptions SingleTableOptions `toml:"single_table"` QueueInitialSize int `toml:"queue_initial_size"`
MultiTableOptions MultiTableOptions `toml:"multi_table"` QueueMaxSize int `toml:"queue_max_size"`
QueueInitialSize int `toml:"queue_initial_size"` QueueFlushSize int `toml:"queue_flush_size"`
QueueMaxSize int `toml:"queue_max_size"` QueueFlushInterval time.Duration `toml:"queue_flush_interval"`
QueueFlushSize int `toml:"queue_flush_size"` ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"`
QueueFlushInterval time.Duration `toml:"queue_flush_interval"` ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"`
ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"` ConnectionMaxIdle int `toml:"connection_max_idle"`
ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"` ConnectionMaxOpen int `toml:"connection_max_open"`
ConnectionMaxIdle int `toml:"connection_max_idle"`
ConnectionMaxOpen int `toml:"connection_max_open"`
db *sql.DB db *sql.DB
Log telegraf.Logger Log telegraf.Logger
@@ -66,18 +56,6 @@ func (ch *ClickHouse) Init() error {
ch.Log.Info("timestamp_column is not set, using default value: ", ch.TimestampColumn) ch.Log.Info("timestamp_column is not set, using default value: ", ch.TimestampColumn)
} }
if ch.TableMode == "" {
ch.TableMode = "multi"
ch.Log.Info("table_mode is not set, using default value: ", ch.TableMode)
} else if ch.TableMode != "single" && ch.TableMode != "multi" {
return fmt.Errorf("table_mode must be one of: single, multi")
}
if ch.TableMode == "single" && ch.SingleTableOptions.TableName == "" {
ch.SingleTableOptions.TableName = "telegraf"
ch.Log.Info("table_name is not set, using default value: ", ch.SingleTableOptions.TableName)
}
if ch.QueueInitialSize <= 0 { if ch.QueueInitialSize <= 0 {
ch.QueueInitialSize = 100000 ch.QueueInitialSize = 100000
ch.Log.Info("queue_initial_size is not set, using default value: ", ch.QueueInitialSize) ch.Log.Info("queue_initial_size is not set, using default value: ", ch.QueueInitialSize)
@@ -180,27 +158,60 @@ func (ch *ClickHouse) toDatatype(value interface{}) string {
return datatype return datatype
} }
func (ch *ClickHouse) toNullable(pair *orderedmap.Pair[string, string]) string { func (ch *ClickHouse) toNullableDatatype(key string, value interface{}) string {
if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { datatype := ch.toDatatype(value)
return fmt.Sprintf("Nullable(%s)", pair.Value) if key == "host" || key == "measurement" {
return fmt.Sprintf("LowCardinality(%s)", datatype)
}
return fmt.Sprintf("Nullable(%s)", datatype)
}
func (ch *ClickHouse) pepareMetrics(metrics []telegraf.Metric, metricsData map[string][]map[string]interface{}, columns map[string]*orderedmap.OrderedMap[string, string]) {
for _, metric := range metrics {
tablename := metric.Name()
if ch.TablePrefix != "" {
tablename = ch.TablePrefix + "_" + tablename
}
if _, ok := metricsData[tablename]; !ok {
metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics))
}
if _, ok := columns[tablename]; !ok {
columns[tablename] = orderedmap.New[string, string](len(metrics))
}
metricEntry := make(map[string]interface{})
metricEntry[ch.TimestampColumn] = metric.Time()
columns[tablename].Set(ch.TimestampColumn, ch.toDatatype(metric.Time()))
for _, tag := range metric.TagList() {
metricEntry[tag.Key] = tag.Value
columns[tablename].Set(tag.Key, ch.toNullableDatatype(tag.Key, tag.Value))
}
for _, field := range metric.FieldList() {
metricEntry[field.Key] = field.Value
columns[tablename].Set(field.Key, ch.toNullableDatatype(field.Key, field.Value))
}
metricsData[tablename] = append(metricsData[tablename], metricEntry)
} }
return pair.Value
} }
func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string {
columnDefs := make([]string, 0, columns.Len()) columnDefs := make([]string, 0, columns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), ch.toNullable(pair))) columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), pair.Value))
} }
orderBy := make([]string, 0, 3) orderBy := make([]string, 0, 2)
if _, ok := columns.Get("host"); ok { if _, ok := columns.Get("host"); ok {
orderBy = append(orderBy, "host") orderBy = append(orderBy, "host")
} }
orderBy = append(orderBy, quoteIdent(ch.TimestampColumn)) orderBy = append(orderBy, quoteIdent(ch.TimestampColumn))
if _, ok := columns.Get("measurement"); ok {
orderBy = append(orderBy, "measurement")
}
createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s) ENGINE = MergeTree() ORDER BY (%s) PARTITION BY toYYYYMM(%s)", createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s) ENGINE = MergeTree() ORDER BY (%s) PARTITION BY toYYYYMM(%s)",
quoteIdent(tablename), quoteIdent(tablename),
@@ -220,15 +231,14 @@ func (ch *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.O
modifyDefs := make([]string, 0, columns.Len()) modifyDefs := make([]string, 0, columns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
columnType := ch.toNullable(pair)
alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s",
quoteIdent(pair.Key), quoteIdent(pair.Key),
columnType)) pair.Value))
modifyDefs = append(modifyDefs, fmt.Sprintf("MODIFY COLUMN IF EXISTS %s %s", modifyDefs = append(modifyDefs, fmt.Sprintf("MODIFY COLUMN IF EXISTS %s %s",
quoteIdent(pair.Key), quoteIdent(pair.Key),
columnType)) pair.Value))
} }
return fmt.Sprintf("ALTER TABLE %s %s, %s", return fmt.Sprintf("ALTER TABLE %s %s, %s",
@@ -276,7 +286,7 @@ func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedM
} }
for pair := columns.Oldest(); pair != nil; pair = pair.Next() { for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
if _, ok := tableColumns[pair.Key]; !ok || tableColumns[pair.Key] != ch.toNullable(pair) { if _, ok := tableColumns[pair.Key]; !ok || tableColumns[pair.Key] != pair.Value {
_, err = ch.db.Exec(ch.generateAlterTable(tablename, columns)) _, err = ch.db.Exec(ch.generateAlterTable(tablename, columns))
if err != nil { if err != nil {
return err return err
@@ -304,140 +314,100 @@ func (ch *ClickHouse) generateInsert(tablename string, columns *orderedmap.Order
placeholders) placeholders)
} }
func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { func (ch *ClickHouse) writeToDB(metrics []telegraf.Metric) error {
err := ch.ensureTable(tablename, columns)
if err != nil {
return err
}
sql := ch.generateInsert(tablename, columns, len(metrics))
tx, err := ch.db.Begin()
if err != nil {
return fmt.Errorf("begin failed: %w", err)
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %w", err)
}
defer stmt.Close()
values := make([][]interface{}, 0, len(metrics))
for _, metric := range metrics {
value := make([]interface{}, 0, columns.Len())
for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
value = append(value, metric[pair.Key])
}
values = append(values, value)
}
for _, value := range values {
_, err = stmt.Exec(value...)
if err != nil {
return fmt.Errorf("exec failed: %w", err)
}
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit failed: %w", err)
}
return nil
}
func (ch *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error {
metricsData := make(map[string][]map[string]interface{}) metricsData := make(map[string][]map[string]interface{})
columns := make(map[string]*orderedmap.OrderedMap[string, string]) columns := make(map[string]*orderedmap.OrderedMap[string, string])
start := time.Now() start := time.Now()
for _, metric := range metrics { ch.pepareMetrics(metrics, metricsData, columns)
tablename := metric.Name()
if ch.MultiTableOptions.TablePrefix != "" {
tablename = ch.MultiTableOptions.TablePrefix + "_" + tablename
}
if _, ok := metricsData[tablename]; !ok {
metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics))
}
if _, ok := columns[tablename]; !ok {
columns[tablename] = orderedmap.New[string, string](len(metrics))
}
metricEntry := make(map[string]interface{})
metricEntry[ch.TimestampColumn] = metric.Time()
columns[tablename].Set(ch.TimestampColumn, ch.toDatatype(metric.Time()))
for _, tag := range metric.TagList() {
metricEntry[tag.Key] = tag.Value
columns[tablename].Set(tag.Key, ch.toDatatype(tag.Value))
}
for _, field := range metric.FieldList() {
metricEntry[field.Key] = field.Value
columns[tablename].Set(field.Key, ch.toDatatype(field.Value))
}
metricsData[tablename] = append(metricsData[tablename], metricEntry)
}
ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start))
start = time.Now() start = time.Now()
for tablename, metrics := range metricsData { for tablename, metrics := range metricsData {
err := ch.writeMetrics(tablename, columns[tablename], metrics) tableColumns := columns[tablename]
err := ch.ensureTable(tablename, tableColumns)
if err != nil { if err != nil {
return err return err
} }
sql := ch.generateInsert(tablename, tableColumns, len(metrics))
tx, err := ch.db.Begin()
if err != nil {
return fmt.Errorf("begin failed: %w", err)
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %w", err)
}
defer stmt.Close()
values := make([][]interface{}, 0, len(metrics))
for _, metric := range metrics {
value := make([]interface{}, 0, tableColumns.Len())
for pair := tableColumns.Oldest(); pair != nil; pair = pair.Next() {
value = append(value, metric[pair.Key])
}
values = append(values, value)
}
for _, value := range values {
_, err = stmt.Exec(value...)
if err != nil {
return fmt.Errorf("exec failed: %w", err)
}
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit failed: %w", err)
}
} }
ch.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start)) ch.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start))
return nil return nil
} }
func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { func (ch *ClickHouse) backgroundWriter(delay time.Duration) {
tablename := ch.SingleTableOptions.TableName timer := time.NewTimer(delay)
metricsData := make([]map[string]interface{}, 0, len(metrics)) defer timer.Stop()
columns := orderedmap.New[string, string](len(metrics))
start := time.Now() for {
for _, metric := range metrics { select {
metricName := metric.Name() case <-timer.C:
ch.metricLock.RLock()
metrics := ch.metricQueue
ch.metricLock.RUnlock()
metricEntry := make(map[string]interface{}) if len(metrics) > 0 {
metricEntry[ch.TimestampColumn] = metric.Time() ch.metricLock.Lock()
columns.Set(ch.TimestampColumn, ch.toDatatype(metric.Time())) ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize)
ch.metricLock.Unlock()
metricEntry["measurement"] = metricName err := ch.writeToDB(metrics)
columns.Set("measurement", ch.toDatatype(metricName)) if err != nil {
ch.Log.Error("Error writing to ClickHouse: ", err)
}
}
for _, tag := range metric.TagList() { timer.Reset(delay)
colName := fmt.Sprintf("%s_%s", metricName, tag.Key)
metricEntry[colName] = tag.Value case <-ch.metricTrigger:
columns.Set(colName, ch.toDatatype(tag.Value)) ch.metricLock.RLock()
metricsLength := len(ch.metricQueue)
ch.metricLock.RUnlock()
if metricsLength < ch.QueueFlushSize {
if !timer.Stop() {
<-timer.C
}
timer.Reset(delay)
}
} }
for _, field := range metric.FieldList() {
colName := fmt.Sprintf("%s_%s", metricName, field.Key)
metricEntry[colName] = field.Value
columns.Set(colName, ch.toDatatype(field.Value))
}
metricsData = append(metricsData, metricEntry)
} }
ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start))
start = time.Now()
err := ch.writeMetrics(tablename, columns, metricsData)
if err != nil {
return err
}
ch.Log.Infof("Wrote %d metrics to %s in %s\n", len(metrics), tablename, time.Since(start))
return nil
} }
func (ch *ClickHouse) Write(metrics []telegraf.Metric) error { func (ch *ClickHouse) Write(metrics []telegraf.Metric) error {
@@ -462,52 +432,6 @@ func (ch *ClickHouse) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
func (ch *ClickHouse) backgroundWriter(delay time.Duration) {
timer := time.NewTimer(delay)
defer timer.Stop()
for {
select {
case <-timer.C:
ch.metricLock.RLock()
metrics := ch.metricQueue
ch.metricLock.RUnlock()
if len(metrics) > 0 {
ch.metricLock.Lock()
ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize)
ch.metricLock.Unlock()
if ch.TableMode == "single" {
err := ch.WriteSingleTable(metrics)
if err != nil {
ch.Log.Error("Error writing to ClickHouse: ", err)
}
} else {
err := ch.WriteMultiTable(metrics)
if err != nil {
ch.Log.Error("Error writing to ClickHouse: ", err)
}
}
}
timer.Reset(delay)
case <-ch.metricTrigger:
ch.metricLock.RLock()
metricsLength := len(ch.metricQueue)
ch.metricLock.RUnlock()
if metricsLength < ch.QueueFlushSize {
if !timer.Stop() {
<-timer.C
}
timer.Reset(delay)
}
}
}
}
func init() { func init() {
outputs.Add("clickhouse", func() telegraf.Output { outputs.Add("clickhouse", func() telegraf.Output {
return &ClickHouse{ return &ClickHouse{

View File

@@ -11,20 +11,8 @@
## Default TTL for data in the table (use ClickHouse syntax) ## Default TTL for data in the table (use ClickHouse syntax)
ttl = "3 MONTH" ttl = "3 MONTH"
## Table operation mode ## Table name prefix
## Set to "single" to create a single table for all metrics. # table_prefix = "telegraf"
## Set to "multi" to create a new table for each metric.
# table_mode = "multi"
## Single table configuration
# [outputs.clickhouse.single_table]
## Table name
# table_name = "telegraf"
## Multi table configuration
# [outputs.clickhouse.multi_table]
## Table name prefix
# table_prefix = "telegraf"
## Initial metric queue size ## Initial metric queue size
## Queue resizes automatically if the queue becomes too large. ## Queue resizes automatically if the queue becomes too large.