diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index ae98a80..ba6d58a 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -40,8 +40,8 @@ type ClickHouse struct { ConnectionMaxIdle int `toml:"connection_max_idle"` ConnectionMaxOpen int `toml:"connection_max_open"` - db *gosql.DB - Log telegraf.Logger `toml:"-"` + db *gosql.DB + Log telegraf.Logger `toml:"-"` } func (*ClickHouse) SampleConfig() string { @@ -281,10 +281,12 @@ func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedM } } + start := time.Now() _, err = stmt.Exec(values...) if err != nil { return fmt.Errorf("exec failed: %w", err) } + p.Log.Debugf("Wrote %d metrics to %s in %s", len(metrics), tablename, time.Since(start)) err = tx.Commit() if err != nil { @@ -294,9 +296,10 @@ func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedM } func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { - metricsByTable := make(map[string][]map[string]interface{}) + metricsData := make(map[string][]map[string]interface{}) columns := make(map[string]*orderedmap.OrderedMap[string, string]) + start := time.Now() for _, metric := range metrics { tablename := metric.Name() @@ -304,8 +307,8 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { tablename = p.MultiTableOptions.TablePrefix + "_" + tablename } - if _, ok := metricsByTable[tablename]; !ok { - metricsByTable[tablename] = make([]map[string]interface{}, 0, len(metrics)) + if _, ok := metricsData[tablename]; !ok { + metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics)) } if _, ok := columns[tablename]; !ok { @@ -327,21 +330,61 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { columns[tablename].Set(field.Key, p.deriveDatatype(field.Value)) } - metricsByTable[tablename] = append(metricsByTable[tablename], metricEntry) + metricsData[tablename] = append(metricsData[tablename], metricEntry) } + p.Log.Debugf("Prepared %d metrics for writing in %s", len(metrics), time.Since(start)) - for tablename, metrics := range metricsByTable { + start = time.Now() + for tablename, metrics := range metricsData { err := p.writeMetrics(tablename, columns[tablename], metrics) if err != nil { return err } } + p.Log.Debugf("Wrote %d metrics to %d tables in %s", len(metrics), len(metricsData), time.Since(start)) return nil } func (p *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { - // TODO + tablename := p.SingleTableOptions.TableName + metricsData := make([]map[string]interface{}, 0, len(metrics)) + columns := orderedmap.New[string, string](len(metrics)) + + start := time.Now() + for _, metric := range metrics { + metricName := metric.Name() + + metricEntry := make(map[string]interface{}) + metricEntry[p.TimestampColumn] = metric.Time() + columns.Set(p.TimestampColumn, p.deriveDatatype(metric.Time())) + + metricEntry["measurement"] = metricName + columns.Set("measurement", p.deriveDatatype(metricName)) + + for _, tag := range metric.TagList() { + colName := fmt.Sprintf("%s_%s", metricName, tag.Key) + metricEntry[colName] = tag.Value + columns.Set(colName, p.deriveDatatype(tag.Value)) + } + + for _, field := range metric.FieldList() { + colName := fmt.Sprintf("%s_%s", metricName, field.Key) + metricEntry[colName] = field.Value + columns.Set(colName, p.deriveDatatype(field.Value)) + } + + metricsData = append(metricsData, metricEntry) + } + p.Log.Debugf("Prepared %d metrics for writing in %s", len(metrics), time.Since(start)) + + start = time.Now() + err := p.writeMetrics(tablename, columns, metricsData) + if err != nil { + return err + } + p.Log.Debugf("Wrote %d metrics to %s in %s", len(metrics), tablename, time.Since(start)) + return nil }