Guide: Using the Observer Pattern¶
Feature overview: Metrics Observer
Runnable demos:
examples/stats-observer·examples/adapters-nethttp·examples/adapters-mqtt·examples/flat-key-patch
End-to-end example: using all six interfaces¶
A single stats.NewFanout value implements all six observer interfaces. Pass it to HTTP, files,
forge pipelines, and codecs — no type assertions needed:
package main
import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"time"
"github.com/DaniDeer/go-codex/codex"
"github.com/DaniDeer/go-codex/forge"
"github.com/DaniDeer/go-codex/format"
"github.com/DaniDeer/go-codex/stats"
"github.com/DaniDeer/go-codex/validate"
)
// ── 1. Pure metrics observer — counts everything ───────────────────────────
type Metrics struct {
mu sync.Mutex
requests int
applies int
fileReads int
fileWrites int
valErrors int
rejections int
}
func (m *Metrics) RecordRequest(_, _ string, _ int, _ time.Duration) { m.mu.Lock(); m.requests++; m.mu.Unlock() }
func (m *Metrics) RecordSubscribe(_ string, _ bool, _ time.Duration) {}
func (m *Metrics) RecordPublish(_ string, _ bool, _ time.Duration) {}
func (m *Metrics) RecordValidationError(_, _, _ string) { m.mu.Lock(); m.valErrors++; m.mu.Unlock() }
func (m *Metrics) RecordApply(_, _ string, _ bool, _ time.Duration) { m.mu.Lock(); m.applies++; m.mu.Unlock() }
func (m *Metrics) RecordSecurityRejection(_, _ string) { m.mu.Lock(); m.rejections++; m.mu.Unlock() }
func (m *Metrics) RecordFileRead(_ string, _ bool, _ time.Duration) { m.mu.Lock(); m.fileReads++; m.mu.Unlock() }
func (m *Metrics) RecordFileWrite(_ string, _ bool, _ time.Duration) { m.mu.Lock(); m.fileWrites++; m.mu.Unlock() }
// ── 2. Trace observer — records span names in memory ───────────────────────
type Tracer struct{ stats.NoopObserver; mu sync.Mutex; entries []string }
func (t *Tracer) StartSpan(ctx context.Context, op, name string) context.Context {
t.mu.Lock(); t.entries = append(t.entries, op+":"+name); t.spans++; t.mu.Unlock()
return ctx
}
func (t *Tracer) EndSpan(_ context.Context, _ error) {}
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
// ── 3. Compose all three into a single observer ────────────────────────
metrics := &Metrics{}
tracer := &Tracer{}
obs := stats.NewFanout(metrics, stats.NewLoggingObserver(logger.With("component", "app")), tracer)
// ── 4. Codec validation (ValidationObserver) ───────────────────────────
emailCodec := codex.String().Refine(validate.Email)
_, err := emailCodec.Decode("not-an-email")
stats.ReportErrors(obs, "config", err)
// ── 5. File read (FileObserver + TraceObserver) ────────────────────────
f := format.NewFile("/tmp/test.json", format.JSON(codex.String()))
_, err = f.Read(nil, format.FileOptions{Observer: obs})
// LoggingObserver: level=DEBUG msg="file read" path=... success=false
// TraceObserver: StartSpan("file.read", "/tmp/test.json")
// Metrics: fileReads++
// ── 6. Forge pipeline (PipelineObserver + TraceObserver) ───────────────
fn := forge.NewFunction("toUpper", "1.0.0",
codex.String(),
codex.String(),
func(s string) (string, error) { return s, nil },
)
reg := forge.NewRegistry("demo", "1.0.0").WithObserver(obs)
fn.Register(reg)
fn.Apply("hello")
// LoggingObserver: level=DEBUG msg="pipeline apply" ...
// Metrics: applies++
// ── 7. Summary ─────────────────────────────────────────────────────────
fmt.Printf("requests=%d applies=%d fileReads=%d fileWrites=%d valErrors=%d rejections=%d\n",
metrics.requests, metrics.applies, metrics.fileReads, metrics.fileWrites,
metrics.valErrors, metrics.rejections)
}
Per-adapter usage¶
Codec-level (ValidationObserver)¶
type ConfigMetrics struct{ errors int }
func (o *ConfigMetrics) RecordValidationError(_, _, _ string) { o.errors++ }
metrics := &ConfigMetrics{}
obs := stats.NewFanout(metrics, stats.NewLoggingObserver(slog.Default()))
val, err := appConfigCodec.Decode(rawData)
stats.ReportErrors(obs, "config", err)
stats.ConstraintName(err) extracts a stable label: ConstraintError.Name, "type-mismatch", "required", or "".
HTTP adapter (Observer)¶
type CountingObserver struct {
mu sync.Mutex
total int
byStatus map[int]int
valErrorsByLoc map[string]int
latencies []time.Duration
}
func (o *CountingObserver) RecordRequest(method, path string, statusCode int, d time.Duration) {
o.mu.Lock()
defer o.mu.Unlock()
o.total++
if o.byStatus == nil { o.byStatus = make(map[int]int) }
o.byStatus[statusCode]++
o.latencies = append(o.latencies, d)
}
func (o *CountingObserver) RecordValidationError(location, constraintName, field string) {
o.mu.Lock()
defer o.mu.Unlock()
if o.valErrorsByLoc == nil { o.valErrorsByLoc = make(map[string]int) }
o.valErrorsByLoc[location]++
}
func (o *CountingObserver) RecordSubscribe(_ string, _ bool, _ time.Duration) {}
func (o *CountingObserver) RecordPublish(_ string, _ bool, _ time.Duration) {}
func (o *CountingObserver) Print() {
for loc, n := range o.valErrorsByLoc {
fmt.Printf(" validation errors at %q: %d\n", loc, n)
}
}
var _ stats.Observer = (*CountingObserver)(nil)
obs := stats.NewFanout(&CountingObserver{}, stats.NewLoggingObserver(logger))
nethttp.Register(mux, createUser, handler, nethttp.Options{Observer: obs})
MQTT adapter¶
amqtt.SubscribeHandler(ctx, channel, handler, amqtt.SubscribeOptions{Observer: obs})
amqtt.Publish(ctx, client, channel, qos, retained, msg, vars,
amqtt.PublishOptions{Observer: obs})
Forge pipeline (PipelineObserver)¶
type PipelineCounts struct {
mu sync.Mutex
applies int
failures int
}
func (l *PipelineCounts) RecordApply(name, version string, ok bool, d time.Duration) {
l.mu.Lock()
defer l.mu.Unlock()
l.applies++
if !ok { l.failures++ }
}
obs := stats.NewFanout(&PipelineCounts{}, stats.NewLoggingObserver(logger))
reg := forge.NewRegistry("Pipeline", "1.0.0").WithObserver(obs)
FileObserver (format.File)¶
format.File[T] type-asserts the observer in FileOptions to stats.FileObserver:
type FileMetrics struct {
mu sync.Mutex
fileReads int
fileWrites int
}
func (o *FileMetrics) RecordFileRead(path string, ok bool, d time.Duration) {
o.mu.Lock(); defer o.mu.Unlock(); o.fileReads++
}
func (o *FileMetrics) RecordFileWrite(path string, ok bool, d time.Duration) {
o.mu.Lock(); defer o.mu.Unlock(); o.fileWrites++
}
var _ stats.FileObserver = (*FileMetrics)(nil)
obs := stats.NewFanout(&FileMetrics{}, stats.NewLoggingObserver(logger))
opts := format.FileOptions{Observer: obs}
cfg, err := configFile.Read(nil, opts)
path is the concrete path after template substitution, never the template string.
SecurityObserver¶
Adapters type-assert stats.SecurityObserver — purely additive, existing implementations need not change:
type MyObserver struct {
CountingObserver
securityRejections int
}
func (o *MyObserver) RecordSecurityRejection(location, scheme string) {
o.securityRejections++
}
TraceObserver (distributed tracing)¶
6th optional interface — type-asserted by adapters, never embedded.
type TraceObserver interface {
StartSpan(ctx context.Context, operation, name string) context.Context
EndSpan(ctx context.Context, err error)
}
OpenTelemetry implementation:
type OTelTracer struct{ stats.NoopObserver }
func (t *OTelTracer) StartSpan(ctx context.Context, operation, name string) context.Context {
ctx, span := otel.Tracer("go-codex").Start(ctx, operation,
otel.WithAttributes(attribute.String("name", name)),
)
return ctx
}
func (t *OTelTracer) EndSpan(ctx context.Context, err error) {
span := trace.SpanFromContext(ctx)
if err != nil { span.RecordError(err) }
span.End()
}
obs := stats.NewFanout(metrics, stats.NewLoggingObserver(logger), &OTelTracer{})
Note:
LoggingObserverdoes not implementTraceObserver(slog has no tracing). Use a slog→OTel bridge for log-trace correlation.
Context propagation through layers¶
TraceObserver spans form a parent-child tree. go-codex adapters propagate the traced
context.Context through the application, enabling full trace chains.
Flow diagram¶
Service A (client)
nethttp.Call(ctx, ...)
└─ traceparent header → Service B
Service B (server)
handler(ctx, req) ← ctx carries incoming span
├─ ApplyContext(ctx, in) ← forge span becomes child of HTTP span
└─ FileOptions{Context: ctx} ← file span becomes child of HTTP span
Adapters (already propagate ctx)¶
| Entry point | ctx purpose |
|---|---|
nethttp.Call(ctx, url, handle, req, vars, opts) |
Creates child span, sends traceparent header |
mqtt.SubscribeHandler(ctx, handle, fn, opts) |
Parent for subscribe span, passed to fn(ctx, value) |
mqtt.Publish(ctx, client, handle, qos, retained, msg, vars, opts) |
Creates child span for publish |
Forge (use ApplyContext)¶
result, err := oeeCalc.ApplyContext(ctx, OEEIn{
Availability: 0.9,
Performance: 0.85,
Quality: 0.95,
})
// forge.apply span is child of HTTP handler span
Apply(in) is unchanged — uses context.Background(). ApplyContext(ctx, in) was added to
enable context propagation without breaking existing callers.
File I/O (set FileOptions.Context)¶
opts := format.FileOptions{
Observer: metrics,
Context: ctx, // file.read span is child of HTTP handler span
}
value, err := configFile.Read(nil, opts)
FileOptions.Context is optional — when nil, falls back to context.Background().
Full example: HTTP handler → forge → file¶
func handler(ctx context.Context, req MyRequest) (MyResponse, error) {
// ctx already carries the HTTP span
// Step 1: forge computation as child span
result, err := oeeCalc.ApplyContext(ctx, OEEIn{
Availability: req.Availability,
Performance: req.Performance,
Quality: req.Quality,
})
if err != nil {
return MyResponse{}, err
}
// Step 2: write result to file as child span
err = resultFile.Write(nil, result, format.FileOptions{
Observer: obs,
Context: ctx,
})
if err != nil {
return MyResponse{}, err
}
return MyResponse{OEE: result.OEE}, nil
}
Operation values (TraceObserver)¶
| Operation | Call site |
|---|---|
"http.request" |
nethttp/chi — handler or client call |
"mqtt.subscribe" |
mqtt — SubscribeHandler |
"mqtt.publish" |
mqtt — Publish |
"forge.apply" |
forge — Apply |
"file.read" |
format.File — Read / Update |
"file.write" |
format.File — Write / Patch / PatchEncoded |
"mcp.tool" |
mcpgo — ToolHandler |
"mcp.resource" |
mcpgo — ResourceHandler |
"mcp.prompt" |
mcpgo — PromptHandler |
Observer location values by adapter¶
| Location | Adapter / use case |
|---|---|
"body" |
nethttp/chi — request or response body decode/encode |
"query" |
nethttp/chi — query parameter validation |
"cookie" |
nethttp/chi — request cookie parameter validation |
"header" |
nethttp/chi — request header parameter validation |
"response_header" |
nethttp/chi — response header parameter validation |
"response_cookie" |
nethttp/chi — response cookie parameter validation |
"path" |
nethttp/chi — path parameter validation |
"payload" |
mqtt — message payload decode (subscribe) or encode (publish) |
"topic_var" |
mqtt — per-variable codec failure in topic template |
"topic" |
mqtt — topic-level codec failure |
"input" |
mcpgo — tool argument decode/validation |
"prompt.args" |
mcpgo — prompt argument codec failure |
"file" |
format.File — per-field codec failure during read/write |
| any string | codec-only: choose your own label ("config", "input", etc.) |
Prometheus example¶
type PrometheusObserver struct {
requests *prometheus.CounterVec // labels: method, path, status
subscribed *prometheus.CounterVec // labels: topic, success
published *prometheus.CounterVec // labels: topic, success
valErrors *prometheus.CounterVec // labels: location, constraint, field
latency *prometheus.HistogramVec // labels: method, path
}
func (o *PrometheusObserver) RecordRequest(method, path string, code int, d time.Duration) {
o.requests.WithLabelValues(method, path, strconv.Itoa(code)).Inc()
o.latency.WithLabelValues(method, path).Observe(d.Seconds())
}
func (o *PrometheusObserver) RecordSubscribe(topic string, ok bool, _ time.Duration) {
o.subscribed.WithLabelValues(topic, strconv.FormatBool(ok)).Inc()
}
func (o *PrometheusObserver) RecordPublish(topic string, ok bool, _ time.Duration) {
o.published.WithLabelValues(topic, strconv.FormatBool(ok)).Inc()
}
func (o *PrometheusObserver) RecordValidationError(loc, constraint, field string) {
o.valErrors.WithLabelValues(loc, constraint, field).Inc()
}
OpenTelemetry tracing example¶
A TraceObserver implementation wrapping the OpenTelemetry SDK. Wire it alongside
metrics and logging via stats.NewFanout:
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
type OTelTracer struct{ stats.NoopObserver }
func (t *OTelTracer) StartSpan(ctx context.Context, op, name string) context.Context {
ctx, span := otel.Tracer("go-codex").Start(ctx, op,
otel.WithAttributes(attribute.String("name", name)),
)
return ctx
}
func (t *OTelTracer) EndSpan(ctx context.Context, err error) {
span := trace.SpanFromContext(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
}
// Wire alongside metrics and logging:
obs := stats.NewFanout(metrics, stats.NewLoggingObserver(logger), &OTelTracer{})
// The same obs propagates traces across every layer:
nethttp.Register(mux, route, handler, nethttp.Options{Observer: obs})
configFile.Read(nil, format.FileOptions{Observer: obs})
forge.NewRegistry("Pipeline", "1.0.0").WithObserver(obs)
How it works:
- Server adapters pass the incoming
*http.Request.Context()toStartSpan. When an OTel middleware has extracted atraceparentheader, the new span is a child. Without middleware, a root span is created. - Client adapters (
nethttp.Call,mqtt.Publish) create a child span from the user-providedctx. For HTTP, thetraceparentheader propagates via the SDK'sNote:
LoggingObserverdoes not implementTraceObserver(slog has no tracing built-in). To correlate log output with trace IDs, configure the logging observer's logger with an OTel slog handler:import ( "log/slog" "go.opentelemetry.io/contrib/slog" // OTel slog handler "os" ) otelHandler := slogotel.NewHandler(slog.NewJSONHandler(os.Stdout), nil) logger := slog.New(otelHandler) obs := stats.NewFanout(metrics, stats.NewLoggingObserver(logger), &OTelTracer{})Every line emitted by
LoggingObservernow carriestrace_idandspan_idautomatically — correlated with the active trace in your observability backend. This is done by OTel SDK's globalTracerProviderandTextMapPropagatorbeing picked up by the handler at runtime.
Using go-logx as the logger backend¶
go-logx produces a *slog.Logger with rotating file output,
buffered writes, and static service/build attrs — a drop-in for slog.NewTextHandler:
import "github.com/DaniDeer/go-logx/logx"
logger, cleanup, err := logx.New(logx.Config{
Console: true,
Level: slog.LevelDebug,
File: "/var/log/myapp.log",
FileLevel: slog.LevelInfo,
DefaultAttrs: []slog.Attr{
slog.String("service", "order-api"),
slog.String("env", "prod"),
},
Build: &logx.BuildInfo{Version: version, Commit: commit, Date: date},
})
defer cleanup()
slog.SetDefault(logger)
obs := stats.NewFanout(metrics, stats.NewLoggingObserver(logger.With("component", "http")))
Every log line carries service=order-api env=prod build.version=... automatically. Omitting defer cleanup() risks losing buffered lines on exit.
See also¶
examples/stats-observer— codec-onlyValidationObserverexamples/adapters-nethttp— HTTP metrics viaNewFanoutexamples/adapters-mqtt— MQTT metrics viaNewFanoutexamples/flat-key-patch—FileObserverwithNewFanoutexamples/forge-collection— forgePipelineObserverwithNewFanoutexamples/oee-chain—PipelineObserveracross all three layersexamples/http-trace-span-propagation— end-to-end trace propagation: HTTP client → server → forge → file