Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
metricNameRetryCount = "retry_count"
metricNameDebugTags = "debug_tags"
metricNameConnErrCount = "connectivity_error_count"
metricNameClientUptime = "uptime"

// Metric units
metricUnitMS = "ms"
Expand Down Expand Up @@ -160,6 +161,7 @@ var (
},
recordedPerAttempt: true,
},
metricNameClientUptime: {},
}

// Generates unique client ID in the format go-<random UUID>@<hostname>
Expand Down Expand Up @@ -221,6 +223,8 @@ type builtinMetricsTracerFactory struct {
retryCount metric.Int64Counter
connErrCount metric.Int64Counter
debugTags metric.Int64Counter
clientUptime metric.Int64ObservableGauge
startTime time.Time
}

// Returns error only if metricsProvider is of unknown type. Rest all errors are swallowed
Expand Down Expand Up @@ -250,7 +254,8 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appP
attribute.String(metricLabelKeyClientUID, clientUID),
attribute.String(metricLabelKeyClientName, clientName),
},
shutdown: func() {},
shutdown: func() {},
startTime: time.Now(),
}

// Create default meter provider
Expand Down Expand Up @@ -427,6 +432,27 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
metric.WithDescription("A counter of internal client events used for debugging."),
metric.WithUnit(metricUnitCount),
)
if err != nil {
return err
}

// Create uptime
startTime := tf.startTime
clientAttrs := tf.clientAttributes
if tf.otelMeterProvider == nil {
// Fallback to built-in meter (custom exporter) if otelMeterProvider is nil (e.g. in some test setups)
tf.clientUptime, err = meter.Int64ObservableGauge(
metricNameClientUptime,
metric.WithDescription("The uptime of the client."),
metric.WithUnit(metricUnitMS),
metric.WithInt64Callback(func(ctx context.Context, obs metric.Int64Observer) error {
if !startTime.IsZero() {
obs.Observe(time.Since(startTime).Milliseconds(), metric.WithAttributes(clientAttrs...))
}
return nil
}),
)
}
return err
}

Expand Down
30 changes: 28 additions & 2 deletions bigtable/metrics_monitoring_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,33 @@ func (me *monitoringExporter) recordToTimeSeriesPb(m otelmetricdata.Metrics) ([]
metric, mr := me.recordToMetricAndMonitoredResourcePbs(m, point.Attributes)
var ts *monitoringpb.TimeSeries
var err error
ts, err = sumToTimeSeries[int64](point, m, mr, kind)
ts, err = dataPointToTimeSeries[int64](point, m, mr, kind)
if err != nil {
errs = append(errs, err)
continue
}
ts.Metric = metric
tss = append(tss, ts)
}
case otelmetricdata.Gauge[int64]:
for _, point := range a.DataPoints {
metric, mr := me.recordToMetricAndMonitoredResourcePbs(m, point.Attributes)
var ts *monitoringpb.TimeSeries
var err error
ts, err = dataPointToTimeSeries[int64](point, m, mr, googlemetricpb.MetricDescriptor_GAUGE)
if err != nil {
errs = append(errs, err)
continue
}
ts.Metric = metric
tss = append(tss, ts)
}
Comment thread
mgarolera marked this conversation as resolved.
case otelmetricdata.Gauge[float64]:
for _, point := range a.DataPoints {
metric, mr := me.recordToMetricAndMonitoredResourcePbs(m, point.Attributes)
var ts *monitoringpb.TimeSeries
var err error
ts, err = dataPointToTimeSeries[float64](point, m, mr, googlemetricpb.MetricDescriptor_GAUGE)
if err != nil {
errs = append(errs, err)
continue
Expand All @@ -271,7 +297,7 @@ func (me *monitoringExporter) recordToTimeSeriesPb(m otelmetricdata.Metrics) ([]
return tss, errors.Join(errs...)
}

func sumToTimeSeries[N int64 | float64](point otelmetricdata.DataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource, kind googlemetricpb.MetricDescriptor_MetricKind) (*monitoringpb.TimeSeries, error) {
func dataPointToTimeSeries[N int64 | float64](point otelmetricdata.DataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource, kind googlemetricpb.MetricDescriptor_MetricKind) (*monitoringpb.TimeSeries, error) {
interval, err := toTimeIntervalPb(point.StartTime, point.Time, kind)
if err != nil {
return nil, err
Expand Down
60 changes: 51 additions & 9 deletions bigtable/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"

mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"cloud.google.com/go/bigtable/bttest"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -245,6 +246,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
metricNameServerLatencies,
metricNameClientBlockingLatencies, metricNameClientBlockingLatencies,
metricNameAppBlockingLatencies,
metricNameClientUptime,
}
wantMetricTypesGCM := []string{}
for _, wantMetricName := range wantMetricNamesStdout {
Expand Down Expand Up @@ -289,6 +291,29 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
createExporterOptions = origCreateExporterOptions
}()

// Override standard exporter for otelMetricsContext to point to the mock GCM server
origNewOtelMetricsContext := newOtelMetricsContext
newOtelMetricsContext = func(ctx context.Context, cfg metricsConfig) (*otelMetricsContext, error) {
mockExporter, err := mexporter.New(
mexporter.WithProjectID(cfg.project),
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
mexporter.WithCreateServiceTimeSeries(),
mexporter.WithMonitoringClientOptions(
option.WithEndpoint(monitoringServer.Endpoint),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
),
)
if err != nil {
return nil, err
}
cfg.customExporter = &mockExporter
return origNewOtelMetricsContext(ctx, cfg)
}
defer func() {
newOtelMetricsContext = origNewOtelMetricsContext
}()

// Setup fake Bigtable server
isFirstAttempt := true
receivedHeader := metadata.MD{}
Expand Down Expand Up @@ -427,11 +452,10 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {

// Get new CreateServiceTimeSeriesRequests
gotCreateTSCalls := monitoringServer.CreateServiceTimeSeriesRequests()
gotMetricCounts := make(map[string]int)
for _, gotCreateTSCall := range gotCreateTSCalls {
gotMetricTypes := []string{}
for _, ts := range gotCreateTSCall.TimeSeries {
// ts.Metric.Type is of the form "bigtable.googleapis.com/internal/client/server_latencies"
gotMetricTypes = append(gotMetricTypes, ts.Metric.Type)
gotMetricCounts[ts.Metric.Type]++

// Assert "streaming" metric label is correct
gotStreaming, gotStreamingExists := ts.Metric.Labels[metricLabelKeyStreamingOperation]
Expand All @@ -446,14 +470,32 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
}

// Assert "method" metric label is correct
wantMethod := "Bigtable.ReadRows"
if gotLabel, ok := ts.Metric.Labels[metricLabelKeyMethod]; !ok || gotLabel != wantMethod {
t.Errorf("Metric label key: %s, value: got: %v, want: %v", metricLabelKeyMethod, gotLabel, wantMethod)
if internalMetricName != metricNameClientUptime {
wantMethod := "Bigtable.ReadRows"
if gotLabel, ok := ts.Metric.Labels[metricLabelKeyMethod]; !ok || gotLabel != wantMethod {
t.Errorf("Metric label key: %s, value: got: %v, want: %v", metricLabelKeyMethod, gotLabel, wantMethod)
}
}
}
}

if test.wantBuiltinEnabled {
// Count expected metrics
wantMetricCounts := make(map[string]int)
for _, wantMetric := range wantMetricTypesGCM {
wantMetricCounts[wantMetric]++
}

// Assert we got at least the expected counts
for wantMetric, wantCount := range wantMetricCounts {
gotCount := gotMetricCounts[wantMetric]
if gotCount < wantCount {
t.Errorf("Metric %s: got count %d, want at least %d. All got: %v", wantMetric, gotCount, wantCount, gotMetricCounts)
}
}
sort.Strings(gotMetricTypes)
if !testutil.Equal(gotMetricTypes, wantMetricTypesGCM) {
t.Errorf("Metric types missing in req. \ngot: %v, \nwant: %v\ndiff: %v", gotMetricTypes, wantMetricTypesGCM, testutil.Diff(gotMetricTypes, wantMetricTypesGCM))
} else {
if len(gotMetricCounts) > 0 {
t.Errorf("Expected no metrics to be exported, but got: %v", gotMetricCounts)
}
}

Expand Down
22 changes: 21 additions & 1 deletion bigtable/otel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"strings"
"time"

"cloud.google.com/go/bigtable/internal"
mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
Expand Down Expand Up @@ -176,7 +178,7 @@ type metricsConfig struct {
resourceOpts []resource.Option // used by tests
}

func newOtelMetricsContext(ctx context.Context, cfg metricsConfig) (*otelMetricsContext, error) {
var newOtelMetricsContext = func(ctx context.Context, cfg metricsConfig) (*otelMetricsContext, error) {
var exporter metric.Exporter
meterOpts := []metric.Option{}
if cfg.customExporter == nil {
Expand Down Expand Up @@ -215,6 +217,24 @@ func newOtelMetricsContext(ctx context.Context, cfg metricsConfig) (*otelMetrics
metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval))))
}
otelMeterProvider := metric.NewMeterProvider(meterOpts...)

// Register client uptime metric on the provider
otelMeter := otelMeterProvider.Meter(builtInMetricsMeterName, otelmetric.WithInstrumentationVersion(internal.Version))
startTime := time.Now()
_, err := otelMeter.Int64ObservableGauge(
"uptime",
otelmetric.WithDescription("The uptime of the client."),
otelmetric.WithUnit("ms"),
otelmetric.WithInt64Callback(func(ctx context.Context, obs otelmetric.Int64Observer) error {
obs.Observe(time.Since(startTime).Milliseconds())
return nil
}),
)
if err != nil {
otelMeterProvider.Shutdown(ctx)
return nil, fmt.Errorf("bigtable: registering uptime metric: %w", err)
}

mo := opentelemetry.MetricsOptions{
MeterProvider: otelMeterProvider,
Metrics: stats.NewMetricSet(
Expand Down
Loading