diff --git a/.gitignore b/.gitignore index fd06c3c..3b735ec 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,6 @@ *.dll *.so *.dylib -clickhouse_* # Test binary, built with `go test -c` *.test diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index 76bd235..568dd4f 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -20,30 +20,20 @@ import ( //go:embed sample.conf var sampleConfig string -type SingleTableOptions struct { - TableName string `toml:"table_name"` -} - -type MultiTableOptions struct { - TablePrefix string `toml:"table_prefix"` -} - type ClickHouse struct { - DataSourceName string `toml:"data_source_name"` - InitSQL string `toml:"init_sql"` - TimestampColumn string `toml:"timestamp_column"` - TTL string `toml:"ttl"` - TableMode string `toml:"table_mode"` - SingleTableOptions SingleTableOptions `toml:"single_table"` - MultiTableOptions MultiTableOptions `toml:"multi_table"` - QueueInitialSize int `toml:"queue_initial_size"` - QueueMaxSize int `toml:"queue_max_size"` - QueueFlushSize int `toml:"queue_flush_size"` - QueueFlushInterval time.Duration `toml:"queue_flush_interval"` - ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"` - ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"` - ConnectionMaxIdle int `toml:"connection_max_idle"` - ConnectionMaxOpen int `toml:"connection_max_open"` + DataSourceName string `toml:"data_source_name"` + InitSQL string `toml:"init_sql"` + TimestampColumn string `toml:"timestamp_column"` + TTL string `toml:"ttl"` + TablePrefix string `toml:"table_prefix"` + QueueInitialSize int `toml:"queue_initial_size"` + QueueMaxSize int `toml:"queue_max_size"` + QueueFlushSize int `toml:"queue_flush_size"` + QueueFlushInterval time.Duration `toml:"queue_flush_interval"` + ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"` + ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"` + ConnectionMaxIdle int `toml:"connection_max_idle"` + ConnectionMaxOpen int `toml:"connection_max_open"` db *sql.DB Log telegraf.Logger @@ -66,18 +56,6 @@ func (ch *ClickHouse) Init() error { ch.Log.Info("timestamp_column is not set, using default value: ", ch.TimestampColumn) } - if ch.TableMode == "" { - ch.TableMode = "multi" - ch.Log.Info("table_mode is not set, using default value: ", ch.TableMode) - } else if ch.TableMode != "single" && ch.TableMode != "multi" { - return fmt.Errorf("table_mode must be one of: single, multi") - } - - if ch.TableMode == "single" && ch.SingleTableOptions.TableName == "" { - ch.SingleTableOptions.TableName = "telegraf" - ch.Log.Info("table_name is not set, using default value: ", ch.SingleTableOptions.TableName) - } - if ch.QueueInitialSize <= 0 { ch.QueueInitialSize = 100000 ch.Log.Info("queue_initial_size is not set, using default value: ", ch.QueueInitialSize) @@ -187,6 +165,41 @@ func (ch *ClickHouse) toNullable(pair *orderedmap.Pair[string, string]) string { return pair.Value } +func (ch *ClickHouse) pepareMetrics(metrics []telegraf.Metric, metricsData map[string][]map[string]interface{}, columns map[string]*orderedmap.OrderedMap[string, string]) { + for _, metric := range metrics { + tablename := metric.Name() + + if ch.TablePrefix != "" { + tablename = ch.TablePrefix + "_" + tablename + } + + if _, ok := metricsData[tablename]; !ok { + metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics)) + } + + if _, ok := columns[tablename]; !ok { + columns[tablename] = orderedmap.New[string, string](len(metrics)) + } + + metricEntry := make(map[string]interface{}) + + metricEntry[ch.TimestampColumn] = metric.Time() + columns[tablename].Set(ch.TimestampColumn, ch.toDatatype(metric.Time())) + + for _, tag := range metric.TagList() { + metricEntry[tag.Key] = tag.Value + columns[tablename].Set(tag.Key, ch.toDatatype(tag.Value)) + } + + for _, field := range metric.FieldList() { + metricEntry[field.Key] = field.Value + columns[tablename].Set(field.Key, ch.toDatatype(field.Value)) + } + + metricsData[tablename] = append(metricsData[tablename], metricEntry) + } +} + func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { columnDefs := make([]string, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { @@ -198,9 +211,6 @@ func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap. orderBy = append(orderBy, "host") } orderBy = append(orderBy, quoteIdent(ch.TimestampColumn)) - if _, ok := columns.Get("measurement"); ok { - orderBy = append(orderBy, "measurement") - } createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s) ENGINE = MergeTree() ORDER BY (%s) PARTITION BY toYYYYMM(%s)", quoteIdent(tablename), @@ -304,140 +314,100 @@ func (ch *ClickHouse) generateInsert(tablename string, columns *orderedmap.Order placeholders) } -func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { - err := ch.ensureTable(tablename, columns) - if err != nil { - return err - } - - sql := ch.generateInsert(tablename, columns, len(metrics)) - - tx, err := ch.db.Begin() - if err != nil { - return fmt.Errorf("begin failed: %w", err) - } - - stmt, err := tx.Prepare(sql) - if err != nil { - return fmt.Errorf("prepare failed: %w", err) - } - defer stmt.Close() - - values := make([][]interface{}, 0, len(metrics)) - - for _, metric := range metrics { - value := make([]interface{}, 0, columns.Len()) - for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - value = append(value, metric[pair.Key]) - } - values = append(values, value) - } - - for _, value := range values { - _, err = stmt.Exec(value...) - if err != nil { - return fmt.Errorf("exec failed: %w", err) - } - } - - err = tx.Commit() - if err != nil { - return fmt.Errorf("commit failed: %w", err) - } - return nil -} - -func (ch *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { +func (ch *ClickHouse) writeToDB(metrics []telegraf.Metric) error { metricsData := make(map[string][]map[string]interface{}) columns := make(map[string]*orderedmap.OrderedMap[string, string]) start := time.Now() - for _, metric := range metrics { - tablename := metric.Name() - - if ch.MultiTableOptions.TablePrefix != "" { - tablename = ch.MultiTableOptions.TablePrefix + "_" + tablename - } - - if _, ok := metricsData[tablename]; !ok { - metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics)) - } - - if _, ok := columns[tablename]; !ok { - columns[tablename] = orderedmap.New[string, string](len(metrics)) - } - - metricEntry := make(map[string]interface{}) - - metricEntry[ch.TimestampColumn] = metric.Time() - columns[tablename].Set(ch.TimestampColumn, ch.toDatatype(metric.Time())) - - for _, tag := range metric.TagList() { - metricEntry[tag.Key] = tag.Value - columns[tablename].Set(tag.Key, ch.toDatatype(tag.Value)) - } - - for _, field := range metric.FieldList() { - metricEntry[field.Key] = field.Value - columns[tablename].Set(field.Key, ch.toDatatype(field.Value)) - } - - metricsData[tablename] = append(metricsData[tablename], metricEntry) - } + ch.pepareMetrics(metrics, metricsData, columns) ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) start = time.Now() for tablename, metrics := range metricsData { - err := ch.writeMetrics(tablename, columns[tablename], metrics) + tableColumns := columns[tablename] + + err := ch.ensureTable(tablename, tableColumns) if err != nil { return err } + + sql := ch.generateInsert(tablename, tableColumns, len(metrics)) + + tx, err := ch.db.Begin() + if err != nil { + return fmt.Errorf("begin failed: %w", err) + } + + stmt, err := tx.Prepare(sql) + if err != nil { + return fmt.Errorf("prepare failed: %w", err) + } + defer stmt.Close() + + values := make([][]interface{}, 0, len(metrics)) + + for _, metric := range metrics { + value := make([]interface{}, 0, tableColumns.Len()) + for pair := tableColumns.Oldest(); pair != nil; pair = pair.Next() { + value = append(value, metric[pair.Key]) + } + values = append(values, value) + } + + for _, value := range values { + _, err = stmt.Exec(value...) + if err != nil { + return fmt.Errorf("exec failed: %w", err) + } + } + + err = tx.Commit() + if err != nil { + return fmt.Errorf("commit failed: %w", err) + } } ch.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start)) return nil } -func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { - tablename := ch.SingleTableOptions.TableName - metricsData := make([]map[string]interface{}, 0, len(metrics)) - columns := orderedmap.New[string, string](len(metrics)) +func (ch *ClickHouse) backgroundWriter(delay time.Duration) { + timer := time.NewTimer(delay) + defer timer.Stop() - start := time.Now() - for _, metric := range metrics { - metricName := metric.Name() + for { + select { + case <-timer.C: + ch.metricLock.RLock() + metrics := ch.metricQueue + ch.metricLock.RUnlock() - metricEntry := make(map[string]interface{}) - metricEntry[ch.TimestampColumn] = metric.Time() - columns.Set(ch.TimestampColumn, ch.toDatatype(metric.Time())) + if len(metrics) > 0 { + ch.metricLock.Lock() + ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize) + ch.metricLock.Unlock() - metricEntry["measurement"] = metricName - columns.Set("measurement", ch.toDatatype(metricName)) + err := ch.writeToDB(metrics) + if err != nil { + ch.Log.Error("Error writing to ClickHouse: ", err) + } + } - for _, tag := range metric.TagList() { - colName := fmt.Sprintf("%s_%s", metricName, tag.Key) - metricEntry[colName] = tag.Value - columns.Set(colName, ch.toDatatype(tag.Value)) + timer.Reset(delay) + + case <-ch.metricTrigger: + ch.metricLock.RLock() + metricsLength := len(ch.metricQueue) + ch.metricLock.RUnlock() + + if metricsLength < ch.QueueFlushSize { + if !timer.Stop() { + <-timer.C + } + timer.Reset(delay) + } } - - for _, field := range metric.FieldList() { - colName := fmt.Sprintf("%s_%s", metricName, field.Key) - metricEntry[colName] = field.Value - columns.Set(colName, ch.toDatatype(field.Value)) - } - - metricsData = append(metricsData, metricEntry) } - ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) - - start = time.Now() - err := ch.writeMetrics(tablename, columns, metricsData) - if err != nil { - return err - } - ch.Log.Infof("Wrote %d metrics to %s in %s\n", len(metrics), tablename, time.Since(start)) - - return nil } func (ch *ClickHouse) Write(metrics []telegraf.Metric) error { @@ -462,52 +432,6 @@ func (ch *ClickHouse) Write(metrics []telegraf.Metric) error { return nil } -func (ch *ClickHouse) backgroundWriter(delay time.Duration) { - timer := time.NewTimer(delay) - defer timer.Stop() - - for { - select { - case <-timer.C: - ch.metricLock.RLock() - metrics := ch.metricQueue - ch.metricLock.RUnlock() - - if len(metrics) > 0 { - ch.metricLock.Lock() - ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize) - ch.metricLock.Unlock() - - if ch.TableMode == "single" { - err := ch.WriteSingleTable(metrics) - if err != nil { - ch.Log.Error("Error writing to ClickHouse: ", err) - } - } else { - err := ch.WriteMultiTable(metrics) - if err != nil { - ch.Log.Error("Error writing to ClickHouse: ", err) - } - } - } - - timer.Reset(delay) - - case <-ch.metricTrigger: - ch.metricLock.RLock() - metricsLength := len(ch.metricQueue) - ch.metricLock.RUnlock() - - if metricsLength < ch.QueueFlushSize { - if !timer.Stop() { - <-timer.C - } - timer.Reset(delay) - } - } - } -} - func init() { outputs.Add("clickhouse", func() telegraf.Output { return &ClickHouse{ diff --git a/plugins/outputs/clickhouse/sample.conf b/plugins/outputs/clickhouse/sample.conf index ccfbbfb..c11d3d9 100644 --- a/plugins/outputs/clickhouse/sample.conf +++ b/plugins/outputs/clickhouse/sample.conf @@ -11,20 +11,8 @@ ## Default TTL for data in the table (use ClickHouse syntax) ttl = "3 MONTH" - ## Table operation mode - ## Set to "single" to create a single table for all metrics. - ## Set to "multi" to create a new table for each metric. - # table_mode = "multi" - - ## Single table configuration - # [outputs.clickhouse.single_table] - ## Table name - # table_name = "telegraf" - - ## Multi table configuration - # [outputs.clickhouse.multi_table] - ## Table name prefix - # table_prefix = "telegraf" + ## Table name prefix + # table_prefix = "telegraf" ## Initial metric queue size ## Queue resizes automatically if the queue becomes too large.