Add WriteSingleTable()

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-06-29 13:16:52 +03:00
parent 726df648a5
commit 47478a9a89

View File

@@ -281,10 +281,12 @@ func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedM
} }
} }
start := time.Now()
_, err = stmt.Exec(values...) _, err = stmt.Exec(values...)
if err != nil { if err != nil {
return fmt.Errorf("exec failed: %w", err) return fmt.Errorf("exec failed: %w", err)
} }
p.Log.Debugf("Wrote %d metrics to %s in %s", len(metrics), tablename, time.Since(start))
err = tx.Commit() err = tx.Commit()
if err != nil { if err != nil {
@@ -294,9 +296,10 @@ func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedM
} }
func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error {
metricsByTable := make(map[string][]map[string]interface{}) metricsData := make(map[string][]map[string]interface{})
columns := make(map[string]*orderedmap.OrderedMap[string, string]) columns := make(map[string]*orderedmap.OrderedMap[string, string])
start := time.Now()
for _, metric := range metrics { for _, metric := range metrics {
tablename := metric.Name() tablename := metric.Name()
@@ -304,8 +307,8 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error {
tablename = p.MultiTableOptions.TablePrefix + "_" + tablename tablename = p.MultiTableOptions.TablePrefix + "_" + tablename
} }
if _, ok := metricsByTable[tablename]; !ok { if _, ok := metricsData[tablename]; !ok {
metricsByTable[tablename] = make([]map[string]interface{}, 0, len(metrics)) metricsData[tablename] = make([]map[string]interface{}, 0, len(metrics))
} }
if _, ok := columns[tablename]; !ok { if _, ok := columns[tablename]; !ok {
@@ -327,21 +330,61 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error {
columns[tablename].Set(field.Key, p.deriveDatatype(field.Value)) columns[tablename].Set(field.Key, p.deriveDatatype(field.Value))
} }
metricsByTable[tablename] = append(metricsByTable[tablename], metricEntry) metricsData[tablename] = append(metricsData[tablename], metricEntry)
} }
p.Log.Debugf("Prepared %d metrics for writing in %s", len(metrics), time.Since(start))
for tablename, metrics := range metricsByTable { start = time.Now()
for tablename, metrics := range metricsData {
err := p.writeMetrics(tablename, columns[tablename], metrics) err := p.writeMetrics(tablename, columns[tablename], metrics)
if err != nil { if err != nil {
return err return err
} }
} }
p.Log.Debugf("Wrote %d metrics to %d tables in %s", len(metrics), len(metricsData), time.Since(start))
return nil return nil
} }
func (p *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { func (p *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error {
// TODO tablename := p.SingleTableOptions.TableName
metricsData := make([]map[string]interface{}, 0, len(metrics))
columns := orderedmap.New[string, string](len(metrics))
start := time.Now()
for _, metric := range metrics {
metricName := metric.Name()
metricEntry := make(map[string]interface{})
metricEntry[p.TimestampColumn] = metric.Time()
columns.Set(p.TimestampColumn, p.deriveDatatype(metric.Time()))
metricEntry["measurement"] = metricName
columns.Set("measurement", p.deriveDatatype(metricName))
for _, tag := range metric.TagList() {
colName := fmt.Sprintf("%s_%s", metricName, tag.Key)
metricEntry[colName] = tag.Value
columns.Set(colName, p.deriveDatatype(tag.Value))
}
for _, field := range metric.FieldList() {
colName := fmt.Sprintf("%s_%s", metricName, field.Key)
metricEntry[colName] = field.Value
columns.Set(colName, p.deriveDatatype(field.Value))
}
metricsData = append(metricsData, metricEntry)
}
p.Log.Debugf("Prepared %d metrics for writing in %s", len(metrics), time.Since(start))
start = time.Now()
err := p.writeMetrics(tablename, columns, metricsData)
if err != nil {
return err
}
p.Log.Debugf("Wrote %d metrics to %s in %s", len(metrics), tablename, time.Since(start))
return nil return nil
} }