diff --git a/.gitignore b/.gitignore index 3b735ec..b8a154d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ *.dll *.so *.dylib +clickhouse # Test binary, built with `go test -c` *.test diff --git a/clickhouse b/clickhouse deleted file mode 100755 index 27e6088..0000000 Binary files a/clickhouse and /dev/null differ diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index ba6d58a..8e55526 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -2,16 +2,17 @@ package sql import ( - gosql "database/sql" + "database/sql" _ "embed" "fmt" + "math" "strings" + "sync" "time" _ "github.com/ClickHouse/clickhouse-go" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" "github.com/wk8/go-ordered-map/v2" ) @@ -35,13 +36,19 @@ type ClickHouse struct { TableMode string `toml:"table_mode"` SingleTableOptions SingleTableOptions `toml:"single_table"` MultiTableOptions MultiTableOptions `toml:"multi_table"` - ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"` - ConnectionMaxLifetime config.Duration `toml:"connection_max_lifetime"` + QueueSize int `toml:"queue_size"` + QueueLimit int `toml:"queue_limit"` + FlushInterval time.Duration `toml:"flush_interval"` + ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"` + ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"` ConnectionMaxIdle int `toml:"connection_max_idle"` ConnectionMaxOpen int `toml:"connection_max_open"` - db *gosql.DB - Log telegraf.Logger `toml:"-"` + db *sql.DB + Log telegraf.Logger + metricQueue []telegraf.Metric + metricLock sync.Mutex + metricTrigger chan struct{} } func (*ClickHouse) SampleConfig() string { @@ -54,26 +61,47 @@ func (p *ClickHouse) Init() error { } if p.TimestampColumn == "" { - fmt.Println("timestamp_column is not set, using default value: timestamp") p.TimestampColumn = "timestamp" + p.Log.Info("timestamp_column is not set, using default value:", p.TimestampColumn) } if p.TableMode == "" { - fmt.Println("table_mode is not set, using default value: single") - p.TableMode = "single" + p.TableMode = "multi" + p.Log.Info("table_mode is not set, using default value:", p.TableMode) } else if p.TableMode != "single" && p.TableMode != "multi" { return fmt.Errorf("table_mode must be one of: single, multi") } if p.TableMode == "single" && p.SingleTableOptions.TableName == "" { p.SingleTableOptions.TableName = "telegraf" + p.Log.Info("table_name is not set, using default value:", p.SingleTableOptions.TableName) } + if p.QueueSize <= 0 { + p.QueueSize = 100000 + p.Log.Info("queue_size is not set, using default value:", p.QueueSize) + } + + if p.QueueLimit <= 0 { + p.QueueLimit = int(math.MaxUint64 >> 1) + p.Log.Info("queue_limit is not set, using default value:", p.QueueLimit) + } + + if p.FlushInterval <= 0 { + p.FlushInterval = 5 * time.Second + p.Log.Info("flush_interval is not set, using default value:", p.FlushInterval) + } + + p.metricQueue = make([]telegraf.Metric, 0, p.QueueSize) + p.metricTrigger = make(chan struct{}, 1) + + go p.metricWriter(p.FlushInterval) + return nil } func (p *ClickHouse) Connect() error { - db, err := gosql.Open("clickhouse", p.DataSourceName) + db, err := sql.Open("clickhouse", p.DataSourceName) if err != nil { return err } @@ -96,7 +124,7 @@ func (p *ClickHouse) Connect() error { } p.db = db - fmt.Println("Connected to ClickHouse!") + p.Log.Info("Connected to ClickHouse!") return nil } @@ -122,7 +150,7 @@ func quoteIdent(name string) string { return `"` + strings.ReplaceAll(sanitizeQuoted(name), `"`, `""`) + `"` } -func (p *ClickHouse) deriveDatatype(value interface{}) string { +func (p *ClickHouse) valueToDatatype(value interface{}) string { var datatype string switch value.(type) { @@ -146,6 +174,36 @@ func (p *ClickHouse) deriveDatatype(value interface{}) string { return datatype } +// TODO: fix this fuckyness +type NullUint64 struct { + Uint64 uint64 + Valid bool +} + +func (p *ClickHouse) datatypeToNullable(datatype string) interface{} { + var nullable interface{} + + switch datatype { + case "Int64": + nullable = int64(0) // sql.NullInt64{} + case "UInt64": + nullable = uint64(0) // NullUint64{} + case "Float64": + nullable = float64(0) // sql.NullFloat64{} + case "String": + nullable = "" // sql.NullString{} + case "UInt8": + nullable = false // sql.NullBool{} + case "DateTime": + nullable = time.Unix(0, 0) // sql.NullTime{} + default: + nullable = "" // sql.NullString{} + p.Log.Errorf("unknown datatype %s", datatype) + } + + return nullable +} + func (p *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { columnDefs := make([]string, 0, columns.Len()) @@ -190,7 +248,7 @@ func (p *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.Or } func (p *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMap[string, string]) error { - var res *gosql.Rows + var res *sql.Rows var err error for { @@ -203,7 +261,7 @@ func (p *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMa if err != nil { return err } - fmt.Println("Created table", tablename) + p.Log.Info("Created table", tablename) continue } return err @@ -232,7 +290,7 @@ func (p *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMa if err != nil { return err } - fmt.Println("Altered table", tablename) + p.Log.Info("Altered table", tablename) break } } @@ -249,12 +307,22 @@ func (p *ClickHouse) generateInsert(tablename string, columns *orderedmap.Ordere placeholder := "(" + strings.Repeat("?,", columns.Len()-1) + "?)" placeholders := strings.Repeat(placeholder+",", batchSize-1) + placeholder - return fmt.Sprintf("INSERT INTO %s(%s) VALUES %s", + return fmt.Sprintf("INSERT INTO %s (%s) VALUES %s", quoteIdent(tablename), strings.Join(quotedColumns, ", "), placeholders) } +func (p *ClickHouse) nullifyMissingValues(columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) { + for _, metric := range metrics { + for pair := columns.Oldest(); pair != nil; pair = pair.Next() { + if _, ok := metric[pair.Key]; !ok { + metric[pair.Key] = p.datatypeToNullable(pair.Value) + } + } + } +} + func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { err := p.ensureTable(tablename, columns) if err != nil { @@ -262,7 +330,6 @@ func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedM } sql := p.generateInsert(tablename, columns, len(metrics)) - values := make([]interface{}, 0, len(metrics)*columns.Len()) tx, err := p.db.Begin() if err != nil { @@ -275,18 +342,25 @@ func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedM } defer stmt.Close() + values := make([][]interface{}, 0, len(metrics)) + for _, metric := range metrics { + value := make([]interface{}, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - values = append(values, metric[pair.Key]) + value = append(value, metric[pair.Key]) } + values = append(values, value) } - start := time.Now() - _, err = stmt.Exec(values...) + for _, value := range values { + _, err = stmt.Exec(value...) + if err != nil { + return fmt.Errorf("exec failed: %w", err) + } + } if err != nil { return fmt.Errorf("exec failed: %w", err) } - p.Log.Debugf("Wrote %d metrics to %s in %s", len(metrics), tablename, time.Since(start)) err = tx.Commit() if err != nil { @@ -318,21 +392,25 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { metricEntry := make(map[string]interface{}) metricEntry[p.TimestampColumn] = metric.Time() - columns[tablename].Set(p.TimestampColumn, p.deriveDatatype(metric.Time())) + columns[tablename].Set(p.TimestampColumn, p.valueToDatatype(metric.Time())) for _, tag := range metric.TagList() { metricEntry[tag.Key] = tag.Value - columns[tablename].Set(tag.Key, p.deriveDatatype(tag.Value)) + columns[tablename].Set(tag.Key, p.valueToDatatype(tag.Value)) } for _, field := range metric.FieldList() { metricEntry[field.Key] = field.Value - columns[tablename].Set(field.Key, p.deriveDatatype(field.Value)) + columns[tablename].Set(field.Key, p.valueToDatatype(field.Value)) } metricsData[tablename] = append(metricsData[tablename], metricEntry) } - p.Log.Debugf("Prepared %d metrics for writing in %s", len(metrics), time.Since(start)) + + for tablename, metrics := range metricsData { + p.nullifyMissingValues(columns[tablename], metrics) + } + p.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) start = time.Now() for tablename, metrics := range metricsData { @@ -341,7 +419,7 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { return err } } - p.Log.Debugf("Wrote %d metrics to %d tables in %s", len(metrics), len(metricsData), time.Since(start)) + p.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start)) return nil } @@ -357,43 +435,97 @@ func (p *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { metricEntry := make(map[string]interface{}) metricEntry[p.TimestampColumn] = metric.Time() - columns.Set(p.TimestampColumn, p.deriveDatatype(metric.Time())) + columns.Set(p.TimestampColumn, p.valueToDatatype(metric.Time())) metricEntry["measurement"] = metricName - columns.Set("measurement", p.deriveDatatype(metricName)) + columns.Set("measurement", p.valueToDatatype(metricName)) for _, tag := range metric.TagList() { colName := fmt.Sprintf("%s_%s", metricName, tag.Key) metricEntry[colName] = tag.Value - columns.Set(colName, p.deriveDatatype(tag.Value)) + columns.Set(colName, p.valueToDatatype(tag.Value)) } for _, field := range metric.FieldList() { colName := fmt.Sprintf("%s_%s", metricName, field.Key) metricEntry[colName] = field.Value - columns.Set(colName, p.deriveDatatype(field.Value)) + columns.Set(colName, p.valueToDatatype(field.Value)) + } + + for pair := columns.Oldest(); pair != nil; pair = pair.Next() { + if _, ok := metricEntry[pair.Key]; !ok { + metricEntry[pair.Key] = p.datatypeToNullable(pair.Value) + } } metricsData = append(metricsData, metricEntry) } - p.Log.Debugf("Prepared %d metrics for writing in %s", len(metrics), time.Since(start)) + + p.nullifyMissingValues(columns, metricsData) + p.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) start = time.Now() err := p.writeMetrics(tablename, columns, metricsData) if err != nil { return err } - p.Log.Debugf("Wrote %d metrics to %s in %s", len(metrics), tablename, time.Since(start)) + p.Log.Infof("Wrote %d metrics to %s in %s\n", len(metrics), tablename, time.Since(start)) return nil } func (p *ClickHouse) Write(metrics []telegraf.Metric) error { - if p.TableMode == "single" { - return p.WriteSingleTable(metrics) + p.metricLock.Lock() + if len(p.metricQueue) >= p.QueueLimit { + p.Log.Errorf("Metrics queue is full (%d/%d), dropping metrics", len(p.metricQueue), p.QueueLimit) + } else { + p.metricQueue = append(p.metricQueue, metrics...) + } + p.metricLock.Unlock() + + select { + case p.metricTrigger <- struct{}{}: + default: } - return p.WriteMultiTable(metrics) + return nil +} + +func (p *ClickHouse) metricWriter(delay time.Duration) { + timer := time.NewTimer(delay) + defer timer.Stop() + + for { + select { + case <-timer.C: + p.metricLock.Lock() + metrics := p.metricQueue + p.metricQueue = nil + p.metricLock.Unlock() + + if len(metrics) > 0 { + if p.TableMode == "single" { + err := p.WriteSingleTable(metrics) + if err != nil { + p.Log.Errorf("Error writing to ClickHouse: %s", err) + } + } else { + err := p.WriteMultiTable(metrics) + if err != nil { + p.Log.Errorf("Error writing to ClickHouse: %s", err) + } + } + } + + timer.Reset(delay) + + case <-p.metricTrigger: + if !timer.Stop() { + <-timer.C + } + timer.Reset(delay) + } + } } func init() { diff --git a/plugins/outputs/clickhouse/sample.conf b/plugins/outputs/clickhouse/sample.conf index f3b8771..3423a7a 100644 --- a/plugins/outputs/clickhouse/sample.conf +++ b/plugins/outputs/clickhouse/sample.conf @@ -1,11 +1,43 @@ # Save metrics to an SQL Database -[[outputs.sql]] +[[outputs.clickhouse]] ## Data source name - # data_source_name = "" + data_source_name = "" ## Initialization SQL # init_sql = "" + ## Timestamp column name + # timestamp_column = "timestamp" + + ## Default TTL for data in the table (use ClickHouse syntax) + ttl = "3 MONTH" + + ## Table operation mode + ## Set to "single" to create a single table for all metrics. + ## Set to "multi" to create a new table for each metric. + # table_mode = "multi" + + ## Single table configuration + # [outputs.clickhouse.single_table] + ## Table name + # table_name = "telegraf" + + ## Multi table configuration + # [outputs.clickhouse.multi_table] + ## Table name prefix + # table_prefix = "telegraf" + + ## Initial metric queue size, resizes automatically if the queue becomes too large + # queue_size = 100000 + + ## Maximum queue size, 0 means unlimited. + ## If the queue reaches this size, new writes will be dropped until it drains. + # queue_limit = 0 + + ## Flush interval for the metric queue + ## The agent waits until N seconds have passed without any writes before flushing metrics to ClickHouse. + # queue_flush_interval = "5" + ## Maximum amount of time a connection may be idle. "0s" means connections are ## never closed due to idle time. # connection_max_idle_time = "0s"