diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index 1a53b92..0afaae5 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -174,16 +174,17 @@ func (ch *ClickHouse) valueToDatatype(value interface{}) string { return datatype } +func (ch *ClickHouse) toNullableType(pair *orderedmap.Pair[string, string]) string { + if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { + return fmt.Sprintf("Nullable(%s)", pair.Value) + } + return pair.Value +} + func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { columnDefs := make([]string, 0, columns.Len()) - for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - columnType := pair.Value - if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { - columnType = fmt.Sprintf("Nullable(%s)", pair.Value) - } - - columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), columnType)) + columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), ch.toNullableType(pair))) } orderBy := make([]string, 0, 3) @@ -210,21 +211,24 @@ func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap. func (ch *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { alterDefs := make([]string, 0, columns.Len()) + modifyDefs := make([]string, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - columnType := pair.Value - if pair.Key != "host" && pair.Key != ch.TimestampColumn && pair.Key != "measurement" { - columnType = fmt.Sprintf("Nullable(%s)", pair.Value) - } + columnType := ch.toNullableType(pair) alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", quoteIdent(pair.Key), columnType)) + + modifyDefs = append(modifyDefs, fmt.Sprintf("MODIFY COLUMN IF EXISTS %s %s", + quoteIdent(pair.Key), + columnType)) } - return fmt.Sprintf("ALTER TABLE %s %s", + return fmt.Sprintf("ALTER TABLE %s %s, %s", quoteIdent(tablename), - strings.Join(alterDefs, ",")) + strings.Join(alterDefs, ","), + strings.Join(modifyDefs, ",")) } func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMap[string, string]) error { @@ -251,21 +255,22 @@ func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedM break } - tableColumns := make(map[string]struct{}) + tableColumns := make(map[string]string) for res.Next() { var name string + var dtype string var _i string - err = res.Scan(&name, &_i, &_i, &_i, &_i, &_i, &_i) + err = res.Scan(&name, &dtype, &_i, &_i, &_i, &_i, &_i) if err != nil { return err } - tableColumns[name] = struct{}{} + tableColumns[name] = dtype } for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - if _, ok := tableColumns[pair.Key]; !ok { + if _, ok := tableColumns[pair.Key]; !ok || tableColumns[pair.Key] != ch.toNullableType(pair) { _, err = ch.db.Exec(ch.generateAlterTable(tablename, columns)) if err != nil { return err @@ -289,7 +294,7 @@ func (ch *ClickHouse) generateInsert(tablename string, columns *orderedmap.Order return fmt.Sprintf("INSERT INTO %s (%s) VALUES %s", quoteIdent(tablename), - strings.Join(quotedColumns, ", "), + strings.Join(quotedColumns, ","), placeholders) } @@ -418,12 +423,6 @@ func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { columns.Set(colName, ch.valueToDatatype(field.Value)) } - for pair := columns.Oldest(); pair != nil; pair = pair.Next() { - if _, ok := metricEntry[pair.Key]; !ok { - metricEntry[pair.Key] = pair.Value - } - } - metricsData = append(metricsData, metricEntry) } ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start))