Observability - Experimental page
This page covers the many ways to view the current state of your Temporal Application —that is, ways to view which Workflow Executions are tracked by the Temporal Platform and the state of any specified Workflow Execution, either currently or at points of an execution.
This section covers features related to viewing the state of the application, including:
- Metrics
- Tracing
- Logging
- Visibility
How to emit metrics
Each Temporal SDK is capable of emitting an optional set of metrics from either the Client or the Worker process. For a complete list of metrics capable of being emitted, see the SDK metrics reference.
Metrics can be scraped and stored in time series databases, such as Prometheus. Temporal also provides a dashboard you can integrate with graphing services like Grafana. For more information, see:
- Temporal's implementation of the Grafana dashboard
- How to export metrics in Grafana
To emit metrics from the Temporal Client in Go, create a metrics handler from the Client Options and specify a listener address to be used by Prometheus.
client.Options{
MetricsHandler: sdktally.NewMetricsHandler(newPrometheusScope(prometheus.Configuration{
ListenAddress: "0.0.0.0:9090",
TimerType: "histogram",
}
The Go SDK currently supports the Tally library; however, Tally offers extensible custom metrics reporting, which is exposed through the WithCustomMetricsReporter
API.
For more information, see the Go sample for metrics.
Tracing and Context Propagation
The Temporal Go SDK supports three tracing implementations: Datadog, OpenTelemetry, and OpenTracing.
Tracing allows you to view the call graph of a Workflow along with its Activities, Nexus Operations, and any Child Workflows.
Tracing can be configured by providing a tracer implementation in ClientOptions during client instantiation.
For details on how to configure and leverage tracing, see the respective documentation:
The OpenTracing support has been validated using Jaeger, but other implementations should also work. Tracing functionality utilizes generic context propagation provided by the client.
Context Propagation
Temporal provides a standard way to propagate a custom context across a Workflow.
You can configure a context propagator in via the ClientOptions.
The context propagator extracts and passes on information present in context.Context
and workflow.Context
objects across the Workflow.
Once a context propagator is configured, you should be able to access the required values in the context objects as you would normally do in Go.
You can see how the Go SDK implements a tracing context propagator.
Server-Side Headers
On the server side, Temporal provides a mechanism for propagating context across Workflow transitions called headers.
message Header {
map<string, Payload> fields = 1;
}
Client
leverages headers to pass around additional context information.
HeaderReader and HeaderWriter are interfaces that allow reading and writing to the Temporal Server headers.
The SDK includes implementations for these interfaces.
HeaderWriter
sets a value for a header.
Headers are held as a map, so setting a value for the same key will overwrite its previous value.
HeaderReader
gets a value of a header.
It also can iterate through all headers and execute the provided handler function on each header, so that your code can operate on select headers you need.
type HeaderWriter interface {
Set(string, *commonpb.Payload)
}
type HeaderReader interface {
Get(string) (*commonpb.Payload, bool)
ForEachKey(handler func(string, *commonpb.Payload) error) error
}
Context Propagators
You can propagate additional context through Workflow Execution by using a context propagator.
A context propagator needs to implement the ContextPropagator
interface that includes the following four methods:
type ContextPropagator interface {
Inject(context.Context, HeaderWriter) error
Extract(context.Context, HeaderReader) (context.Context, error)
InjectFromWorkflow(Context, HeaderWriter) error
ExtractToWorkflow(Context, HeaderReader) (Context, error)
}
Inject
reads select context keys from a Go context.Context object and writes them into the headers using the HeaderWriter interface.InjectFromWorkflow
operates similar toInject
but reads from a workflow.Context object.Extract
picks select headers and put their values into the context.Context object.ExtractToWorkflow
operates similar toExtract
but write to a workflow.Context object.
The tracing context propagator shows a sample implementation of a context propagator.
Is there a complete example?
The context propagation sample configures a custom context propagator and shows context propagation of custom keys across a Workflow and an Activity. It also uses Jaeger for tracing.
Can I configure multiple context propagators?
Yes. Multiple context propagators help to structure code with each propagator having its own scope of responsibility.
Context Propagation Over Nexus Operation Calls
Nexus does not use the standard context propagator header structure.
Instead, it relies on a Temporal-agnostic protocol designed to connect arbitrary systems.
To propagate context over Nexus Operation calls, the context is serialized into a nexus.Header
.
This is essentially a wrapper around map[string]string
with helper methods to Set
and Get
values.
The header normalizes all keys to lowercase.
Because Nexus uses this custom format, and because Nexus calls may involve external systems, the ContextPropagator
interface doesn’t apply to Nexus headers.
Context must be explicitly propagated through interceptors, as shown in the Nexus Context Propagation sample.
Useful Resources
- Passing Context with Temporal by SpiralScout
The Go SDK provides support for distributed tracing with Interceptors. Interceptors uses Temporal headers to create a call graph of a Workflow, along with its Activities and Child Workflows.
There are several tracing implementations supported by the Temporal Go SDK.
For an OpenTracing Interceptor, use opentracing.NewInterceptor(opentracing.TracerOptions{})
to create a TracingInterceptor
.
// create Interceptor
tracingInterceptor, err := opentracing.NewInterceptor(opentracing.TracerOptions{})
For an OpenTelemetry Interceptor, use opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{})
.
// create Interceptor
tracingInterceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{})
For a Datadog Interceptor, use tracing.NewTracingInterceptor(tracing.TracerOptions{})
.
// create Interceptor
tracingInterceptor, err := tracing.NewTracingInterceptor(tracing.TracerOptions{})
Pass the newly created Interceptor to ClientOptions to enable tracing.
c, err := client.Dial(client.Options{
Interceptors: []interceptor.ClientInterceptor{tracingInterceptor},
})
OpenTracing and OpenTelemetry are natively supported by Jaeger. For more information on configuring and using tracing, see the documentation provided by OpenTracing, OpenTelemetry, and Datadog.
To emit metrics with the Java SDK, use theMicrometerClientStatsReporter
class to integrate with Micrometer MeterRegistry configured for your metrics backend.
Micrometer is a popular Java framework that provides integration with Prometheus and other backends.
The following example shows how to use MicrometerClientStatsReporter
to define the metrics scope and set it with the WorkflowServiceStubsOptions
.
//...
// see the Micrometer documentation for configuration details on other supported monitoring systems.
// in this example shows how to set up Prometheus registry and stats reported.
PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
StatsReporter reporter = new MicrometerClientStatsReporter(registry);
// set up a new scope, report every 10 seconds
Scope scope = new RootScopeBuilder()
.reporter(reporter)
.reportEvery(com.uber.m3.util.Duration.ofSeconds(10));
// for Prometheus collection, expose a scrape endpoint.
//...
// add metrics scope to WorkflowServiceStub options
WorkflowServiceStubsOptions stubOptions =
WorkflowServiceStubsOptions.newBuilder().setMetricsScope(scope).build();
//...
For more details, see the Java SDK Samples. For details on configuring a Prometheus scrape endpoint with Micrometer, see the Micrometer Prometheus Configuring documentation.
Set up tracing
Tracing allows you to view the call graph of a Workflow along with its Activities, Nexus Operations, and any Child Workflows.
Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.
To configure tracing in Java, register the OpenTracingClientInterceptor()
interceptor.
You can register the interceptors on both the Temporal Client side and the Worker side.
The following code examples demonstrate the OpenTracingClientInterceptor()
on the Temporal Client.
WorkflowClientOptions.newBuilder()
//...
.setInterceptors(new OpenTracingClientInterceptor())
.build();
WorkflowClientOptions clientOptions =
WorkflowClientOptions.newBuilder()
.setInterceptors(new OpenTracingClientInterceptor(JaegerUtils.getJaegerOptions(type)))
.build();
WorkflowClient client = WorkflowClient.newInstance(service, clientOptions);
The following code examples demonstrate the OpenTracingClientInterceptor()
on the Worker.
WorkerFactoryOptions.newBuilder()
//...
.setWorkerInterceptors(new OpenTracingWorkerInterceptor())
.build();
WorkerFactoryOptions factoryOptions =
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(
new OpenTracingWorkerInterceptor(JaegerUtils.getJaegerOptions(type)))
.build();
WorkerFactory factory = WorkerFactory.newInstance(client, factoryOptions);
For more information, see the Temporal OpenTracing module.
Context Propagation Over Nexus Operation Calls
Nexus does not use the standard context propagator header structure.
Instead, it relies on a Temporal-agnostic protocol designed to connect arbitrary systems.
To propagate context over Nexus Operation calls, the context is serialized into a Map<String, String>
.
This map is special as it will normalize all keys to lowercase.
Because Nexus uses this custom format, and because Nexus calls may involve external systems, the ContextPropagator
interface doesn’t apply to Nexus headers.
Context must be explicitly propagated through interceptors, as shown in the Nexus Context Propagation sample.
Metrics in Python are configured globally; therefore, you should set a Prometheus endpoint before any other Temporal code.
The following example exposes a Prometheus endpoint on port 9000
.
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
# Create a new runtime that has telemetry enabled. Create this first to avoid
# the default Runtime from being lazily created.
new_runtime = Runtime(telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9000")))
my_client = await Client.connect("my.temporal.host:7233", runtime=new_runtime)
Set up tracing
How to set up tracing
Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.
Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.
To configure tracing in Python, install the opentelemetry
dependencies.
# This command installs the `opentelemetry` dependencies.
pip install temporalio[opentelemetry]
Then the temporalio.contrib.opentelemetry.TracingInterceptor
class can be set as an interceptor as an argument of Client.connect()
.
When your Client is connected, spans are created for all Client calls, Activities, and Workflow invocations on the Worker. Spans are created and serialized through the server to give one trace for a Workflow Execution.
Workers can emit metrics and traces. There are a few telemetry options that can be provided to Runtime.install
. The common options are:
metrics: { otel: { url } }
: The URL of a gRPC OpenTelemetry collector.metrics: { prometheus: { bindAddress } }
: Address on the Worker host that will have metrics for Prometheus to scrape.
To set up tracing of Workflows and Activities, use our opentelemetry-interceptors
package.
(For details, see the next section.)
telemetryOptions: {
metrics: {
prometheus: { bindAddress: '0.0.0.0:9464' },
},
logging: { forward: { level: 'DEBUG' } },
},
Set up tracing
Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.
Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.
The interceptors-opentelemetry
sample shows how to use the SDK's built-in OpenTelemetry tracing to trace everything from starting a Workflow to Workflow Execution to running an Activity from that Workflow.
The built-in tracing uses protobuf message headers (like this one when starting a Workflow) to propagate the tracing information from the client to the Workflow and from the Workflow to its successors (when Continued As New), children, and Activities.
All of these executions are linked with a single trace identifier and have the proper parent -> child
span relation.
Tracing is compatible between different Temporal SDKs as long as compatible context propagators are used.
Context propagation
The TypeScript SDK uses the global OpenTelemetry propagator.
To extend the default (Trace Context and Baggage propagators) to also include the Jaeger propagator, follow these steps:
-
npm i @opentelemetry/propagator-jaeger
-
At the top level of your Workflow code, add the following lines:
import { propagation } from '@opentelemetry/api';
import {
CompositePropagator,
W3CBaggagePropagator,
W3CTraceContextPropagator,
} from '@opentelemetry/core';
import { JaegerPropagator } from '@opentelemetry/propagator-jaeger';
propagation.setGlobalPropagator(
new CompositePropagator({
propagators: [
new W3CTraceContextPropagator(),
new W3CBaggagePropagator(),
new JaegerPropagator(),
],
}),
);
Similarly, you can customize the OpenTelemetry NodeSDK
propagators by following the instructions in the Initialize the SDK section of the README.md
file.
The Temporal PHP SDK goes not support emitting metrics from Client or Worker processes.
Metrics in .NET are configured on the Metrics
property of the Telemetry
property on the TemporalRuntime
. That object should be created globally and should be used for all clients; therefore, you should configure this before any other Temporal code.
Set a Prometheus endpoint
How to set a Prometheus endpoint using the .NET SDK
The following example exposes a Prometheus endpoint on port 9000
.
using Temporalio.Client;
using Temporalio.Runtime;
var runtime = new TemporalRuntime(new()
{
Telemetry = new() { Metrics = new() { Prometheus = new("0.0.0.0:9000") } },
});
var client = await Temporalio.ConnectAsync(new("localhost:7233") { Runtime = runtime });
Set a custom metric meter
How to reuse the .NET metric meter using the Temporal .NET SDK
A custom metric meter can be set on the telemetry options to handle metrics programmatically. The Temporalio.Extensions.DiagnosticSource extension provides a custom metric meter implementation that sends all metrics to a System.Diagnostics.Metrics.Meter instance.
using System.Diagnostics.Metrics;
using Temporalio.Client;
using Temporalio.Extensions.DiagnosticSource;
using Temporalio.Runtime;
// Create .NET meter
using var meter = new Meter("My.Meter");
// Can create MeterListener or OTel meter provider here...
// Create Temporal runtime with a custom metric meter for that meter
var runtime = new TemporalRuntime(new()
{
Telemetry = new()
{
Metrics = new() { CustomMetricMeter = new CustomMetricMeter(meter) },
},
});
var client = await Temporalio.ConnectAsync(new("localhost:7233") { Runtime = runtime });
Setup Tracing
How to configure tracing using the Temporal .NET SDK
Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.
To configure OpenTelemetry tracing in .NET, use the Temporalio.Extensions.OpenTelemetry extension.
The Temporalio.Extensions.OpenTelemetry.TracingInterceptor
class can be set as an interceptor in the client options.
When your Client is connected, spans are created for all Client calls, Activities, and Workflow invocations on the Worker. Spans are created and serialized through the server to give one trace for a Workflow Execution.
Metrics in Ruby are configured on the metrics
argument of the telemetry
argument when creating a global Temporalio::Runtime
. That object should be created globally and should be used for all clients; therefore, you should configure this before any other Temporal code.
Set a Prometheus endpoint
The following example exposes a Prometheus endpoint on port 9000
.
Temporalio::Runtime.default = Temporalio::Runtime.new(
telemetry: Temporalio::Runtime::TelemetryOptions.new(
metrics: Temporalio::Runtime::MetricsOptions.new(
prometheus: Temporalio::Runtime::PrometheusMetricsOptions.new(
bind_address: '0.0.0.0:9000'
)
)
)
)
Custom metric handling
Instead of Prometheus or OpenTelemetry, an instance of Temporalio::Runtime::MetricBuffer
can be provided as a buffer
argument to the MetricsOptions
.
retrieve_updates
can then be periodically called on the buffer to get metric updates.
Setup Tracing
Tracing enables observability into the sequence of calls across your application, including Workflows and Activities.
OpenTelemetry tracing for clients, activities, and workflows can be enabled using the Temporalio::Contrib::OpenTelemetry::TracingInterceptor
. Specifically, when creating a client, set the interceptor like so:
require 'opentelemetry/api'
require 'opentelemetry/sdk'
require 'temporalio/client'
require 'temporalio/contrib/open_telemetry'
# ... assumes my_otel_tracer_provider is a tracer provider created by the user
my_tracer = my_otel_tracer_provider.tracer('my-otel-tracer')
my_client = Temporalio::Client.connect(
'localhost:7233', 'my-namespace',
interceptors: [Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(my_tracer)]
)
When your Client is connected, spans are created for all Client calls, Activities, and Workflow invocations on the Worker. Spans are created and serialized through the server to give one trace for a Workflow Execution.
Log from a Workflow
How to log from a Workflow using the Go SDK.
Send logs and errors to a logging service, so that when things go wrong, you can see what happened.
Logging enables you to record critical information during code execution. Loggers create an audit trail and capture information about your Workflow's operation. An appropriate logging level depends on your specific needs. During development or troubleshooting, you might use debug or even trace. In production, you might use info or warn to avoid excessive log volume.
The logger supports the following logging levels:
Level | Use |
---|---|
TRACE | The most detailed level of logging, used for very fine-grained information. |
DEBUG | Detailed information, typically useful for debugging purposes. |
INFO | General information about the application's operation. |
WARN | Indicates potentially harmful situations or minor issues that don't prevent the application from working. |
ERROR | Indicates error conditions that might still allow the application to continue running. |
The Temporal SDK core normally uses WARN
as its default logging level.
In Workflow Definitions you can use workflow.GetLogger(ctx)
to write logs.
import (
"context"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
// Workflow is a standard workflow definition.
// Note that the Workflow and Activity don't need to care that
// their inputs/results are being compressed.
func Workflow(ctx workflow.Context, name string) (string, error) {
// ...
workflow.WithActivityOptions(ctx, ao)
// Getting the logger from the context.
logger := workflow.GetLogger(ctx)
// Logging a message with the key value pair `name` and `name`
logger.Info("Compressed Payloads workflow started", "name", name)
info := map[string]string{
"name": name,
}
logger.Info("Compressed Payloads workflow completed.", "result", result)
return result, nil
}
Provide a custom logger
How to provide a custom logger to the Temporal Client using the Go SDK.
This field sets a custom Logger that is used for all logging actions of the instance of the Temporal Client.
Although the Go SDK does not support most third-party logging solutions natively, our friends at Banzai Cloud built the adapter package logur which makes it possible to use third party loggers with minimal overhead. Most of the popular logging solutions have existing adapters in Logur, but you can find a full list in the Logur Github project.
Here is an example of using Logur to support Logrus:
package main
import (
"go.temporal.io/sdk/client"
"github.com/sirupsen/logrus"
logrusadapter "logur.dev/adapter/logrus"
"logur.dev/logur"
)
func main() {
// ...
logger := logur.LoggerToKV(logrusadapter.New(logrus.New()))
clientOptions := client.Options{
Logger: logger,
}
temporalClient, err := client.Dial(clientOptions)
// ...
}
To get a standard slf4j
logger in your Workflow code, use the Workflow.getLogger
method.
private static final Logger logger = Workflow.getLogger(DynamicDslWorkflow.class);
Logs in replay mode are omitted unless the WorkerFactoryOptions.Builder.setEnableLoggingInReplay(boolean)
method is set to true.
How to provide a custom logger
Use a custom logger for logging.
To set a custom logger, supply your own logging implementation and configuration details the same way you would in any other Java application.
You can log from a Workflow using Python's standard library, by importing the logging module logging
.
Set your logging configuration to a level you want to expose logs to.
The following example sets the logging information level to INFO
.
logging.basicConfig(level=logging.INFO)
Then in your Workflow, set your logger
and level on the Workflow. The following example logs the Workflow.
View the source code
in the context of the rest of the application code.
# ...
workflow.logger.info("Workflow input parameter: %s" % name)
Custom logger
Use a custom logger for logging.
Use the built-in Logging facility for Python to set a custom logger.
Logging from Activities
Activities run in the standard Node.js environment and may therefore use any Node.js logger directly.
The Temporal SDK however provides a convenient Activity Context logger, which funnels log messages to the Runtime's logger. Attributes from the current Activity context are automatically included as metadata on every log entries emitted using the Activity context logger, and some key events of the Activity's lifecycle are automatically logged (at DEBUG level for most messages; WARN for failures). Using the Activity Context loggerimport { log } from '@temporalio/activity';
export async function greet(name: string): Promise<string> {
log.info('Log from activity', { name });
return `Hello, ${name}!`;
}
Logging from Workflows
Workflows may not use regular Node.js loggers because:
- Workflows run in a sandboxed environment and cannot do any I/O.
- Workflow code might get replayed at any time, which would result in duplicated log messages.
The Temporal SDK however provides a Workflow Context logger, which funnels log messages to the Runtime's logger. Attributes from the current Workflow context are automatically included as metadata on every log entries emitted using the Workflow context logger, and some key events of the Workflow's lifecycle are automatically logged (at DEBUG level for most messages; WARN for failures). Using the Workflow Context loggerimport { log } from '@temporalio/workflow';
export async function myWorkflow(name: string): Promise<string> {
log.info('Log from workflow', { name });
return `Hello, ${name}!`;
}
The Workflow Context Logger tries to avoid reemitting log messages on Workflow Replays.
Limitations of Workflow logs
Internally, Workflow logging uses Sinks, and is consequently subject to the same limitations as Sinks. Notably, logged objects must be serializable using the V8 serialization.
What is the Runtime's Logger
A Temporal Worker may emit logs in various ways, including:
- Messages emitted using the Workflow Context Logger;
- Messages emitted using the Activity Context Logger;
- Messages emitted by the TypeScript SDK Worker itself;
- Messages emitted by the underlying Temporal Core SDK (native code).
All of these messages are internally routed to a single logger object, called the Runtime's Logger.
By default, the Runtime's Logger simply write messages to the console (i.e. the process's STDOUT
).
How to customize the Runtime's Logger
A custom Runtime Logger may be registered when the SDK Runtime
is instantiated. This is done only once per process.
To register a custom Runtime Logger, you must explicitly instantiate the Runtime, using the Runtime.install()
function.
For example:
import {
DefaultLogger,
makeTelemetryFilterString,
Runtime,
} from '@temporalio/worker';
// This is your custom Logger.
const logger = new DefaultLogger('WARN', ({ level, message }) => {
console.log(`Custom logger: ${level} — ${message}`);
});
Runtime.install({
logger,
// The following block is optional, but generally desired.
// It allows capturing log messages emitted by the underlying Temporal Core SDK (native code).
// The Telemetry Filter String determine the desired verboseness of messages emitted by the
// Temporal Core SDK itself ("core"), and by other native libraries ("other").
telemetryOptions: {
logging: {
filter: makeTelemetryFilterString({ core: 'INFO', other: 'INFO' }),
forward: {},
},
},
});
A common use case for this is to write log messages to a file to be picked up by a collector service, such as the Datadog Agent. For example:
import {
DefaultLogger,
makeTelemetryFilterString,
Runtime,
} from '@temporalio/worker';
import winston from 'winston';
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [new transports.File({ filename: '/path/to/worker.log' })],
});
Runtime.install({
logger,
// The following block is optional, but generally desired.
// It allows capturing log messages emitted by the underlying Temporal Core SDK (native code).
// The Telemetry Filter String determine the desired verboseness of messages emitted by the
// Temporal Core SDK itself ("core"), and by other native libraries ("other").
telemetryOptions: {
logging: {
filter: makeTelemetryFilterString({ core: 'INFO', other: 'INFO' }),
forward: {},
},
},
});
Implementing custom Logging-like features based on Workflow Sinks
Sinks enable one-way export of logs, metrics, and traces from the Workflow isolate to the Node.js environment.
Sinks are written as objects with methods. Similar to Activities, they are declared in the Worker and then proxied in Workflow code, and it helps to share types between both.
Comparing Sinks and Activities
Sinks are similar to Activities in that they are both registered on the Worker and proxied into the Workflow. However, they differ from Activities in important ways:
- A sink function doesn't return any value back to the Workflow and cannot be awaited.
- A sink call isn't recorded in the Event History of a Workflow Execution (no timeouts or retries).
- A sink function always runs on the same Worker that runs the Workflow Execution it's called from.
Declare the sink interface
Explicitly declaring a sink's interface is optional but is useful for ensuring type safety in subsequent steps:
packages/test/src/workflows/log-sink-tester.ts
import type { Sinks } from '@temporalio/workflow';
export interface CustomLoggerSinks extends Sinks {
customLogger: {
info(message: string): void;
};
}
Implement sinks
Implementing sinks is a two-step process.
Implement and inject the Sink function into a Worker
import { InjectedSinks, Worker } from '@temporalio/worker';
import { MySinks } from './workflows';
async function main() {
const sinks: InjectedSinks<MySinks> = {
alerter: {
alert: {
fn(workflowInfo, message) {
console.log('sending SMS alert!', {
workflowId: workflowInfo.workflowId,
workflowRunId: workflowInfo.runId,
message,
});
},
callDuringReplay: false, // The default
},
},
};
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'sinks',
sinks,
});
await worker.run();
console.log('Worker gracefully shutdown');
}
main().catch((err) => {
console.error(err);
process.exit(1);
});
- Sink function implementations are passed as an object into WorkerOptions.
- You can specify whether you want the injected function to be called during Workflow replay by setting the
callDuringReplay
option.
Proxy and call a sink function from a Workflow
packages/test/src/workflows/log-sample.ts
import * as wf from '@temporalio/workflow';
export async function logSampleWorkflow(): Promise<void> {
wf.log.info('Workflow execution started');
}
Some important features of the InjectedSinkFunction interface:
- Injected WorkflowInfo argument: The first argument of a Sink function implementation is a
workflowInfo
object that contains useful metadata. - Limited arguments types: The remaining Sink function arguments are copied between the sandbox and the Node.js environment using the structured clone algorithm.
- No return value: To prevent breaking determinism, Sink functions cannot return values to the Workflow.
Advanced: Performance considerations and non-blocking Sinks
The injected sink function contributes to the overall Workflow Task processing duration.
- If you have a long-running sink function, such as one that tries to communicate with external services, you might start seeing Workflow Task timeouts.
- The effect is multiplied when using
callDuringReplay: true
and replaying long Workflow histories because the Workflow Task timer starts when the first history page is delivered to the Worker.
How to provide a custom logger
Use a custom logger for logging.
Logging in Workers and Clients
The Worker comes with a default logger, which defaults to log any messages with level INFO
and higher to STDERR
using console.error
.
The following log levels are listed in increasing order of severity.
Customizing the default logger
Temporal uses a DefaultLogger
that implements the basic interface:
import { DefaultLogger, Runtime } from '@temporalio/worker';
const logger = new DefaultLogger('WARN', ({ level, message }) => {
console.log(`Custom logger: ${level} — ${message}`);
});
Runtime.install({ logger });
The previous code example sets the default logger to log only messages with level WARN
and higher.
Accumulate logs for testing and reporting
import { DefaultLogger, LogEntry, LogLevel } from '@temporalio/worker';
const logs: LogEntry[] = [];
const logger = new DefaultLogger(LogLevel.TRACE, (entry) => logs.push(entry));
logger.debug('hey', { a: 1 });
logger.info('ho');
logger.warn('lets', { a: 1 });
logger.error('go');
A common logging use case is logging to a file to be picked up by a collector like the Datadog Agent.
import { Runtime } from '@temporalio/worker';
import winston from 'winston';
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [new transports.File({ filename: '/path/to/worker.log' })],
});
Runtime.install({ logger });
To get a PSR-3 compatible logger in your Workflow code, use the Workflow::getLogger()
method.
use Temporal\Workflow;
#[Workflow\WorkflowInterface]
class MyWorkflow
{
#[Workflow\WorkflowMethod]
public function execute(string $param): \Generator
{
Workflow::getLogger()->info('Workflow started', ['parameter' => $param]);
// Your workflow implementation
Workflow::getLogger()->info('Workflow completed');
return 'Done';
}
}
The Workflow logger automatically enriches log context with the current Task Queue name.
Logs in replay mode are omitted unless the enableLoggingInReplay
Worker option is set to true.
$factory = WorkerFactory::create();
$worker = $factory->newWorker('your-task-queue', WorkerOptions::new()
->withEnableLoggingInReplay(true)
);
Default Logger
By default, PHP SDK uses a StderrLogger
that outputs log messages to the standard error stream.
These messages are automatically captured by RoadRunner and incorporated into its logging system with the INFO level, ensuring proper log collection in both development and production environments.
For more details on RoadRunner's logging capabilities, see the RoadRunner Logger documentation.
How to provide a custom logger
You can set a custom PSR-3 compatible logger when creating a Worker:
$myLogger = new MyLogger();
$workerFactory = WorkerFactory::create(converter: $converter);
$worker = $workerFactory->newWorker(
taskQueue: 'my-task-queue',
logger: $myLogger,
);
Logging uses the .NET standard logging APIs.
The LoggerFactory
can be set in the client.
The following example shows logging on the console and sets the level to Information
.
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information)),
});
You can log from a Workflow using Workflow.Logger
which is an instance of .NET's ILogger
.
Workflow.Logger.LogInformation("Given name: {Name}", name);
Logging uses the Ruby standard logging APIs.
The logger
can be set when connecting a client.
The following example shows logging on the console and sets the level to INFO
.
require 'logger'
require 'temporalio/client'
my_client = Temporalio::Client.connect(
'localhost:7233', 'my-namespace',
logger: Logger.new($stdout, level: Logger::INFO)
)
You can log from a Workflow using Temporalio::Workflow.logger
which is a special instance of Ruby's Logger
that
appends workflow details to every log and does not log during replay.
Temporalio::Workflow.logger.info("Some log #{some_value}")
There's also one for use in activities that appends Activity details to every log:
Temporalio::Activity::Context.current.logger.info("Some log #{some_value}")
Visibility APIs
The term Visibility, within the Temporal Platform, refers to the subsystems and APIs that enable an operator to view Workflow Executions that currently exist within a Temporal Service.
Search Attributes
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
- A few generic Custom Search Attributes like
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Temporal Service using
temporal operator search-attribute create
or the Cloud UI. - Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
UpsertSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
DescribeWorkflow
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In the Temporal CLI.
- In code by calling
ListWorkflowExecutions
.
Here is how to query Workflow Executions:
The ListWorkflow() function retrieves a list of Workflow Executions that match the Search Attributes of a given List Filter. The metadata returned from the Visibility store can be used to get a Workflow Execution's history and details from the Persistence store.
Use a List Filter to define a request
to pass into ListWorkflow()
.
request := &workflowservice.ListWorkflowExecutionsRequest{ Query: "CloseTime = missing" }
This request
value returns only open Workflows.
For more List Filter examples, see the examples provided for List Filters in the Temporal Visibility guide.
resp, err := temporalClient.ListWorkflow(ctx.Background(), request)
if err != nil {
return err
}
fmt.Println("First page of results:")
for _, exec := range resp.Executions {
fmt.Printf("Workflow ID %v\n", exec.Execution.WorkflowId)
}
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
- A few generic Custom Search Attributes like
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Temporal Service using
temporal operator search-attribute create
or the Cloud UI. - Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
upsertTypedSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
DescribeWorkflow
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In the Temporal CLI.
- In code by calling
ListWorkflowExecutions
.
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
- A few generic Custom Search Attributes like
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Temporal Service in the Temporal CLI or Web UI.
- For example:
temporal operator search-attribute create --name CustomKeywordField --type Text
- Replace
CustomKeywordField
with the name of your Search Attribute. - Replace
Text
with a type value associated with your Search Attribute:Text
|Keyword
|Int
|Double
|Bool
|Datetime
|KeywordList
- Replace
- For example:
- Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
upsert_search_attributes
.
- Read the value of the Search Attribute:
- On the Client by calling
DescribeWorkflow
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In the Temporal CLI
- In code by calling
ListWorkflowExecutions
.
Here is how to query Workflow Executions:
Use the list_workflows() method on the Client handle and pass a List Filter as an argument to filter the listed Workflows.
View the source code
in the context of the rest of the application code.
# ...
async for workflow in client.list_workflows('WorkflowType="GreetingWorkflow"'):
print(f"Workflow: {workflow.id}")
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
- A few generic Custom Search Attributes like
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Temporal Service using
temporal operator search-attribute create
or the Cloud UI. - Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
UpsertSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
DescribeWorkflow
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- With the Temporal CLI.
- In code by calling
ListWorkflowExecutions
.
Here is how to query Workflow Executions:
Use WorkflowService.listWorkflowExecutions
:
import { Connection } from '@temporalio/client';
const connection = await Connection.connect();
const response = await connection.workflowService.listWorkflowExecutions({
query: `ExecutionStatus = "Running"`,
});
where query
is a List Filter.
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
- A few generic Custom Search Attributes like
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Temporal Service using
temporal operator search-attribute create
or the Cloud UI. - Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
UpsertSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
DescribeWorkflow
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In the Temporal CLI.
- In code by calling
ListWorkflowExecutions
.
Here is how to query Workflow Executions:
Use the listWorkflowExecutions() method on the Client and pass a List Filter as an argument to filter the listed Workflows.
The result is an iterable paginator, so you can use the foreach
loop to iterate over the results.
$paginator = $workflowClient->listWorkflowExecutions('WorkflowType="GreetingWorkflow"');
foreach ($paginator as $info) {
echo "Workflow ID: {$info->execution->getID()}\n";
}
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
). - A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Temporal Service in the CLI or Web UI.
- For example:
temporal operator search-attribute create --name CustomKeywordField --type Text
- Replace
CustomKeywordField
with the name of your Search Attribute. - Replace
Text
with a type value associated with your Search Attribute:Text
|Keyword
|Int
|Double
|Bool
|Datetime
|KeywordList
- Replace
- For example:
- Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
UpsertTypedSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
Describe
on aWorkflowHandle
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In the Temporal CLI
- In code by calling
ListWorkflowsAsync
.
List Workflow Executions
How to list Workflow Executions using the .NET SDK
Use the ListWorkflowsAsync() method on the Client and pass a List Filter as an argument to filter the listed Workflows. The result is an async enumerable.
await foreach (var wf in client.ListWorkflowsAsync("WorkflowType='GreetingWorkflow'"))
{
Console.WriteLine("Workflow: {0}", wf.Id);
}
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
). - A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Temporal Service in the CLI or Web UI.
- For example:
temporal operator search-attribute create --name CustomKeywordField --type Text
- Replace
CustomKeywordField
with the name of your Search Attribute. - Replace
Text
with a type value associated with your Search Attribute:Text
|Keyword
|Int
|Double
|Bool
|Datetime
|KeywordList
- Replace
- For example:
- Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an argument when starting the Execution.
- In the Workflow by calling
Temporalio::Workflow.upsert_search_attributes
.
- Read the value of the Search Attribute:
- On the Client by calling
describe
on aWorkflowHandle
. - In the Workflow by looking at
Temporalio::Workflow.search_attributes
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In the Temporal CLI
- In code by calling
list_workflows
.
List Workflow Executions
Use the list_workflows method on the Client and pass a List Filter as an argument to filter the listed Workflows. The result is a lazy enumerator/enumerable.
my_client.list_workflows("WorkflowType='GreetingWorkflow'").each do |wf|
puts "Workflow: #{wf.id}"
end
Set custom Search Attributes
After you've created custom Search Attributes in your Temporal Service (using the temporal operator search-attribute create
command or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
Provide key-value pairs in StartWorkflowOptions.SearchAttributes
.
Search Attributes are represented as map[string]interface{}
.
The values in the map must correspond to the Search Attribute's value type:
- Bool =
bool
- Datetime =
time.Time
- Double =
float64
- Int =
int64
- Keyword =
string
- Text =
string
If you had custom Search Attributes CustomerId
of type Keyword and MiscData
of type Text, you would provide string
values:
func (c *Client) CallYourWorkflow(ctx context.Context, workflowID string, payload map[string]interface{}) error {
// ...
searchAttributes := map[string]interface{}{
"CustomerId": payload["customer"],
"MiscData": payload["miscData"]
}
options := client.StartWorkflowOptions{
SearchAttributes: searchAttributes
// ...
}
we, err := c.Client.ExecuteWorkflow(ctx, options, app.YourWorkflow, payload)
// ...
}
After you've created custom Search Attributes in your Temporal Service (using temporal operator search-attribute create
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
When starting a Workflow Execution with your Client, include the Custom Search Attribute in the options using WorkflowOptions.newBuilder().setTypedSearchAttributes()
:
// In a shared constants file, so all files have access
public static final SearchAttributeKey<Boolean> IS_ORDER_FAILED = SearchAttributeKey.forBoolean("isOrderFailed");
...
// In main
WorkflowOptions options = WorkflowOptions.newBuilder()
.setWorkflowId(workflowID)
.setTaskQueue(Constants.TASK_QUEUE_NAME)
.setTypedSearchAttributes(generateSearchAttributes())
.build();
PizzaWorkflow workflow = client.newWorkflowStub(PizzaWorkflow.class, options);
...
// Further down in the file
private static Map<String, Object> generateSearchAttributes(){
return SearchAttributes.newBuilder().set(Constants.IS_ORDER_FAILED, false).build();
}
Each SearchAttribute
object represents a custom attribute name, and the value is a SearchAttributeKey
representing a specific type. Currently the following types are supported:
- Boolean
- Double
- Long
- KeyWord
- KeyWordList
- Text
In this example isOrderFailed
is set as a Search Attribute. This attribute is
useful for querying Workflows based the success/failure of customer orders.
After you've created custom Search Attributes in your Temporal Service (using temporal operator search-attribute create
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
Use SearchAttributeKey
to create your Search Attributes. Then, when starting a Workflow execution using client.start_workflow()
, include the Custom Search Attributes by passing instances of SearchAttributePair()
containing each of your keys and starting values to a parameter called search_attributes
.
If you had Custom Search Attributes CustomerId
of type Keyword
and MiscData
of type Text
, you could provide these starting values:
customer_id_key = SearchAttributeKey.for_keyword("CustomerId")
misc_data_key = SearchAttributeKey.for_text("MiscData")
handle = await client.start_workflow(
GreetingWorkflow.run,
id="search-attributes-workflow-id",
task_queue="search-attributes-task-queue",
search_attributes=TypedSearchAttributes([
SearchAttributePair(customer_id_key, "customer_1"),
SearchAttributePair(misc_data_key, "customer_1_data")
]),
)
In this example, CustomerId
and MiscData
are set as Search Attributes.
These attributes are useful for querying Workflows based on the customer ID or the date the order was placed.
After you've created custom Search Attributes in your Temporal Service (using temporal operator search-attribute create
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
Use WorkflowOptions.searchAttributes
.
search-attributes/src/client.ts
const handle = await client.workflow.start(example, {
taskQueue: 'search-attributes',
workflowId: 'search-attributes-example-0',
searchAttributes: {
CustomIntField: [2],
CustomKeywordField: ['keywordA', 'keywordB'],
CustomBoolField: [true],
CustomDatetimeField: [new Date()],
CustomStringField: [
'String field is for text. When queried, it will be tokenized for partial match. StringTypeField cannot be used in Order By',
],
},
});
const { searchAttributes } = await handle.describe();
The type of searchAttributes
is Record<string, string[] | number[] | boolean[] | Date[]>
.
After you've created custom Search Attributes in your Temporal Service (using temporal operator search-attribute create
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
To set custom Search Attributes, use the withTypedSearchAttributes
method on WorkflowOptions
for a Workflow stub.
Typed search attributes are a TypedSearchAttributes
collection.
$keyDestinationTime = SearchAttributeKey::forDatetime('DestinationTime');
$keyOrderId = SearchAttributeKey::forKeyword('OrderId');
$workflow = $workflowClient->newWorkflowStub(
OrderWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowExecutionTimeout('10 minutes')
->withTypedSearchAttributes(
TypedSearchAttributes::empty()
->withValue($keyOrderId, $orderid)
->withValue($keyDestinationTime, new \DateTimeImmutable('2028-11-05T00:10:07Z'))
),
);
After you've created custom Search Attributes in your Temporal Service (using temporal operator search-attribute create
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
To set custom Search Attributes, use the TypedSearchAttributes
property on WorkflowOptions
for StartWorkflowAsync
or ExecuteWorkflowAsync
.
Typed search attributes are a SearchAttributeCollection
created with a builder.
// This only needs to be created once, so it is common to make it a static readonly even though we
// create inline here for demonstration
var myKeywordAttributeKey = SearchAttributeKey.CreateKeyword("MyKeywordAttribute");
// Start workflow with the search attribute collection
var handle = await client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
new(id: "my-workflow-id", taskQueue: "my-task-queue")
{
TypedSearchAttributes = new SearchAttributeCollection.Builder().
Set(myKeywordAttributeKey, "SomeKeywordValue").
ToSearchAttributeCollection(),
});
After you've created custom Search Attributes in your Temporal Service (using temporal operator search-attribute create
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
To set custom Search Attributes, use the search_attributes
parameter for start_workflow
or execute_workflow
.
Keys should be predefined for reuse.
# Predefined search attribute key, usually a global somewhere
MY_KEYWORD_KEY = Temporalio::SearchAttributes::Key.new(
'my-keyword',
Temporalio::SearchAttributes::IndexedValueType::KEYWORD
)
# ...
# Start workflow with the search attribute set
handle = my_client.start_workflow(
MyWorkflow, 'some-input',
id: 'my-workflow-id', task_queue: 'my-task-queue',
search_attributes: Temporalio::SearchAttributes.new({ MY_KEYWORD_KEY => 'some-value' })
)
Upsert Search Attributes
You can upsert Search Attributes to add or update Search Attributes from within Workflow code.
In advanced cases, you may want to dynamically update these attributes as the Workflow progresses. UpsertSearchAttributes is used to add or update Search Attributes from within Workflow code.
UpsertSearchAttributes
will merge attributes to the existing map in the Workflow.
Consider this example Workflow code:
func YourWorkflow(ctx workflow.Context, input string) error {
attr1 := map[string]interface{}{
"CustomIntField": 1,
"CustomBoolField": true,
}
workflow.UpsertSearchAttributes(ctx, attr1)
attr2 := map[string]interface{}{
"CustomIntField": 2,
"CustomKeywordField": "seattle",
}
workflow.UpsertSearchAttributes(ctx, attr2)
}
After the second call to UpsertSearchAttributes
, the map will contain:
map[string]interface{}{
"CustomIntField": 2, // last update wins
"CustomBoolField": true,
"CustomKeywordField": "seattle",
}
Within the Workflow code, you can dynamically add or update Search Attributes using upsertTypedSearchAttributes
.
This method is particularly useful for Workflows whose attributes need to change based on internal logic or external events.
import io.temporal.workflow.Workflow;
...
// Existing Workflow Logic
Map<String, Object> searchAttribute = new HashMap<>();
Distance distance;
try {
distance = activities.getDistance(address);
searchAttribute.put("isOrderFailed", false);
Workflow.upsertTypedSearchAttributes(Constants.IS_ORDER_FAILED.valueSet(false));
} catch (NullPointerException e) {
searchAttribute.put("isOrderFailed", true);
Workflow.upsertTypedSearchAttributes(Constants.IS_ORDER_FAILED.valueSet(true));
throw new NullPointerException("Unable to get distance");
}
You can upsert Search Attributes to add or update Search Attributes from within Workflow code.
To upsert custom Search Attributes, use the upsert_search_attributes()
method to pass instances of SearchAttributePair()
containing each of your keys and starting values to a parameter to a TypedSearchAttributes()
object:
workflow.upsert_search_attributes(TypedSearchAttributes([
SearchAttributePair(customer_id_key, "customer_2")
]))
You can upsert Search Attributes to add or update Search Attributes from within Workflow code.
Inside a Workflow, we can read from WorkflowInfo.searchAttributes
and call upsertSearchAttributes
:
search-attributes/src/workflows.ts
export async function example(): Promise<SearchAttributes> {
const customInt =
(workflowInfo().searchAttributes.CustomIntField?.[0] as number) || 0;
upsertSearchAttributes({
// overwrite the existing CustomIntField: [2]
CustomIntField: [customInt + 1],
// delete the existing CustomBoolField: [true]
CustomBoolField: [],
// add a new value
CustomDoubleField: [3.14],
});
return workflowInfo().searchAttributes;
}
Within the Workflow code, you can dynamically add or update Search Attributes using upsertTypedSearchAttributes
.
This method is particularly useful for Workflows whose attributes need to change based on internal logic or external events.
#[Workflow\UpdateMethod]
public function postponeDestinationTime(\DateInterval $interval)
{
// Get the key for the DestinationTime attribute
$keyDestinationTime = SearchAttributeKey::forDatetime('DestinationTime');
/** @var DateTimeImmutable $destinationTime */
$destinationTime = Workflow::getInfo()->typedSearchAttributes->get($keyDestinationTime);
Workflow::upsertTypedSearchAttributes(
$keyDestinationTime->valueSet($destinationTime->add($interval)),
);
}
You can upsert Search Attributes to add, update, or remove Search Attributes from within Workflow code.
To upsert custom Search Attributes, use the UpsertTypedSearchAttributes()
method with a set of updates.
Keys can be predefined for reuse.
// These only need to be created once, so it is common to make them static readonly even though we
// create inline here for demonstration
var myKeywordAttributeKey = SearchAttributeKey.CreateKeyword("MyKeywordAttribute");
var myTextAttributeKey = SearchAttributeKey.CreateText("MyTextAttribute");
// Add/Update the keyword one and remove the text one
Workflow.UpsertTypedSearchAttributes(
myKeywordAttributeKey.ValueSet("SomeKeywordValue"),
myTextAttrbiuteKey.ValueUnset());
You can upsert Search Attributes to add, update, or remove Search Attributes from within Workflow code.
To upsert custom Search Attributes, use the upsert_search_attributes
method with a set of updates.
Keys should be predefined for reuse.
# Predefined search attribute key, usually a global somewhere
MY_KEYWORD_KEY = Temporalio::SearchAttributes::Key.new(
'my-keyword',
Temporalio::SearchAttributes::IndexedValueType::KEYWORD
)
# ...
class MyWorkflow < Temporalio::Workflow::Definition
def execute
# ...
Temporalio::Workflow.upsert_search_attributes(MY_KEYWORD_KEY.value_set('some-new-value'))
# ...
end
end
Remove a Search Attribute from a Workflow
To remove a Search Attribute that was previously set, set it to an empty array: []
.
There is no support for removing a field.
However, to achieve a similar effect, set the field to some placeholder value.
For example, you could set CustomKeywordField
to impossibleVal
.
Then searching CustomKeywordField != 'impossibleVal'
will match Workflows with CustomKeywordField
not equal to impossibleVal
, which includes Workflows without the CustomKeywordField
set.
To remove a Search Attribute that was previously set, set it to an empty Map.
// In a shared constants file, so all files have access
public static final SearchAttributeKey<Boolean> IS_ORDER_FAILED = SearchAttributeKey.forBoolean("isOrderFailed");
...
Workflow.upsertTypedSearchAttributes(Constants.IS_ORDER_FAILED.valueUnset());
To remove a Search Attribute that was previously set, set it to an empty array: []
.
workflow.upsert_search_attributes(TypedSearchAttributes([
SearchAttributePair(customer_id_key, [])
]))
To remove a Search Attribute that was previously set, set it to an empty array: []
.
import { upsertSearchAttributes } from '@temporalio/workflow';
async function yourWorkflow() {
upsertSearchAttributes({ CustomIntField: [1, 2, 3] });
// ... later, to remove:
upsertSearchAttributes({ CustomIntField: [] });
}
To remove a Search Attribute that was previously set, set it to an empty Map.
#[Workflow\UpdateMethod]
public function unsetDestinationTime()
{
// Get the key for the DestinationTime attribute
$keyDestinationTime = SearchAttributeKey::forDatetime('DestinationTime');
Workflow::upsertTypedSearchAttributes(
$keyDestinationTime->valueUnset(),
);
}
The Temporal .NET SDK does not support removing Search Attributes.
The Temporal Ruby SDK does not support removing Search Attributes.