diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index b6a7724..faa7801 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -62,34 +62,34 @@ func (ch *ClickHouse) Init() error { if ch.TimestampColumn == "" { ch.TimestampColumn = "timestamp" - ch.Log.Info("timestamp_column is not set, using default value:", ch.TimestampColumn) + ch.Log.Info("timestamp_column is not set, using default value: ", ch.TimestampColumn) } if ch.TableMode == "" { ch.TableMode = "multi" - ch.Log.Info("table_mode is not set, using default value:", ch.TableMode) + ch.Log.Info("table_mode is not set, using default value: ", ch.TableMode) } else if ch.TableMode != "single" && ch.TableMode != "multi" { return fmt.Errorf("table_mode must be one of: single, multi") } if ch.TableMode == "single" && ch.SingleTableOptions.TableName == "" { ch.SingleTableOptions.TableName = "telegraf" - ch.Log.Info("table_name is not set, using default value:", ch.SingleTableOptions.TableName) + ch.Log.Info("table_name is not set, using default value: ", ch.SingleTableOptions.TableName) } if ch.QueueSize <= 0 { ch.QueueSize = 100000 - ch.Log.Info("queue_size is not set, using default value:", ch.QueueSize) + ch.Log.Info("queue_size is not set, using default value: ", ch.QueueSize) } if ch.QueueLimit <= 0 { ch.QueueLimit = int(math.MaxUint64 >> 1) - ch.Log.Info("queue_limit is not set, using default value:", ch.QueueLimit) + ch.Log.Info("queue_limit is not set, using default value: ", ch.QueueLimit) } if ch.FlushInterval <= 0 { ch.FlushInterval = 5 * time.Second - ch.Log.Info("flush_interval is not set, using default value:", ch.FlushInterval) + ch.Log.Info("flush_interval is not set, using default value: ", ch.FlushInterval) } ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueSize) @@ -179,35 +179,16 @@ type NullUint64 struct { Valid bool } -func (ch *ClickHouse) datatypeToNullable(datatype string) interface{} { - var nullable interface{} - - switch datatype { - case "Int64": - nullable = sql.NullInt64{Int64: 0, Valid: false} - case "UInt64": - nullable = NullUint64{Uint64: 0, Valid: false} - case "Float64": - nullable = sql.NullFloat64{Float64: 0, Valid: false} - case "String": - nullable = sql.NullString{String: "", Valid: false} - case "UInt8": - nullable = sql.NullBool{Bool: false, Valid: false} - case "DateTime": - nullable = sql.NullTime{Time: time.Unix(0, 0), Valid: false} - default: - nullable = sql.NullString{String: "", Valid: false} - ch.Log.Errorf("unknown datatype %s", datatype) - } - - return nullable -} - func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { columnDefs := make([]string, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), pair.Value)) + columnType := pair.Value + if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { + columnType = fmt.Sprintf("Nullable(%s)", pair.Value) + } + + columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), columnType)) } orderBy := make([]string, 0, 3) @@ -236,9 +217,14 @@ func (ch *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.O alterDefs := make([]string, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { + columnType := pair.Value + if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { + columnType = fmt.Sprintf("Nullable(%s)", pair.Value) + } + alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", quoteIdent(pair.Key), - pair.Value)) + columnType)) } return fmt.Sprintf("ALTER TABLE %s %s", @@ -260,7 +246,7 @@ func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedM if err != nil { return err } - ch.Log.Info("Created table", tablename) + ch.Log.Info("Created table: ", tablename) continue } return err @@ -289,7 +275,7 @@ func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedM if err != nil { return err } - ch.Log.Info("Altered table", tablename) + ch.Log.Info("Altered table: ", tablename) break } } @@ -312,16 +298,6 @@ func (ch *ClickHouse) generateInsert(tablename string, columns *orderedmap.Order placeholders) } -func (ch *ClickHouse) nullifyMissingValues(columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) { - for _, metric := range metrics { - for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - if _, ok := metric[pair.Key]; !ok { - metric[pair.Key] = ch.datatypeToNullable(pair.Value) - } - } - } -} - func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { err := ch.ensureTable(tablename, columns) if err != nil { @@ -405,10 +381,6 @@ func (ch *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { metricsData[tablename] = append(metricsData[tablename], metricEntry) } - - for tablename, metrics := range metricsData { - ch.nullifyMissingValues(columns[tablename], metrics) - } ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) start = time.Now() @@ -453,14 +425,12 @@ func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { for pair := columns.Oldest(); pair != nil; pair = pair.Next() { if _, ok := metricEntry[pair.Key]; !ok { - metricEntry[pair.Key] = ch.datatypeToNullable(pair.Value) + metricEntry[pair.Key] = pair.Value } } metricsData = append(metricsData, metricEntry) } - - ch.nullifyMissingValues(columns, metricsData) ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) start = time.Now() @@ -506,12 +476,12 @@ func (ch *ClickHouse) metricWriter(delay time.Duration) { if ch.TableMode == "single" { err := ch.WriteSingleTable(metrics) if err != nil { - ch.Log.Errorf("Error writing to ClickHouse: %s", err) + ch.Log.Error("Error writing to ClickHouse: ", err) } } else { err := ch.WriteMultiTable(metrics) if err != nil { - ch.Log.Errorf("Error writing to ClickHouse: %s", err) + ch.Log.Error("Error writing to ClickHouse: ", err) } } }