Snakemake logger plugin: stomp
Warning
This plugin is not maintained and reviewed by the official Snakemake organization.
Snakemake Logger Plugin for STOMP
Stream Snakemake workflow events to STOMP message brokers (ActiveMQ, RabbitMQ, Apollo) in real-time for monitoring, audit trails, and event-driven system integration.
Key Features
Real-time event streaming to STOMP-compatible message brokers
SSL/TLS encryption for secure production deployments
Flexible formatters - default flat JSON or JLab Scientific Workflow schema
Event filtering - include/exclude specific event types to reduce noise
Custom formatters - plugin your own formatter classes
Environment variable support for credentials and sensitive configuration
Installation
pip install snakemake-logger-plugin-stomp
Quick Start
Command Line
snakemake --logger stomp \
--logger-stomp-host localhost \
--logger-stomp-port 61613 \
--logger-stomp-user admin \
--logger-stomp-password admin \
--logger-stomp-queue /queue/snakemake.events
Profile Configuration
# profiles/stomp/config.yaml
logger:
stomp:
host: "activemq.example.com"
port: 61613
user: "${STOMP_USER}"
password: "${STOMP_PASSWORD}"
queue: "/topic/snakemake.events"
Then run:
snakemake --profile profiles/stomp
Environment Variables
Secure credentials using environment variables:
export SNAKEMAKE_LOGGER_STOMP_HOST=activemq.example.com
export SNAKEMAKE_LOGGER_STOMP_USER=admin
export SNAKEMAKE_LOGGER_STOMP_PASSWORD=secret
snakemake --logger stomp
Use Cases
External monitoring - Send workflow events to centralized monitoring systems
Audit trails - Create permanent records of workflow execution in message queues
Event-driven automation - Trigger downstream processes based on workflow events
Team dashboards - Build real-time dashboards showing workflow status across teams
Integration - Connect Snakemake to enterprise message bus architectures
Install this plugin by installing it with pip or mamba directly, e.g.:
pip install snakemake-logger-plugin-stomp
Or, if you are using pixi, add the plugin to your pixi.toml. Be careful to put it under the right dependency type based on the plugin’s availability, e.g.:
snakemake-logger-plugin-stomp = "*"
In order to use the plugin, run Snakemake (>=9.0) with the corresponding value for the logger flag:
snakemake --logger stomp ...
with ... being any additional arguments you want to use.
The logger plugin has the following settings (which can be passed via command line, the workflow or environment variables, if provided in the respective columns):
Advanced Configuration
SSL/TLS Encryption
For production brokers requiring encrypted connections:
Profile configuration:
# profiles/production/config.yaml
logger:
stomp:
host: "secure-broker.example.com"
port: 61614
use_ssl: true
cert_file: "/path/to/client.crt"
key_file: "/path/to/client.key"
queue: "/topic/snakemake.secure"
Command line:
snakemake --logger stomp \
--logger-stomp-host secure-broker.example.com \
--logger-stomp-port 61614 \
--logger-stomp-use-ssl \
--logger-stomp-cert-file /path/to/client.crt \
--logger-stomp-key-file /path/to/client.key
Event Filtering
Control which events generate messages to reduce noise or focus on specific workflow stages.
Whitelist Approach
Only send specific events:
snakemake --logger stomp \
--logger-stomp-include-events "WORKFLOW_STARTED,JOB_STARTED,JOB_FINISHED,WORKFLOW_FINISHED"
Blacklist Approach
Exclude noisy events:
snakemake --logger stomp \
--logger-stomp-exclude-events "DEBUG,PROGRESS,RESOURCES_INFO"
Available events: WORKFLOW_STARTED, WORKFLOW_FINISHED, JOB_STARTED, JOB_FINISHED, JOB_ERROR, DEBUG, PROGRESS, RESOURCES_INFO, and more.
Built-in Formatters
DefaultJSONFormatter (default)
Produces flat, easy-to-parse JSON:
{
"timestamp": "2024-01-15T14:30:00Z",
"hostname": "compute-node-01",
"workflow_id": "550e8400-e29b-41d4-a716-446655440000",
"event_type": "JOB_FINISHED",
"level": "INFO",
"message": "Job 5 completed successfully"
}
JLabSWFFormatter
Nested schema compliant with JLab Scientific Workflow Facility standards:
{
"schema_ref": "org.jlab.swf.event",
"schema_version": "1.0.0",
"event_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"event_type": "SNAKEMAKE_JOB_FINISHED",
"event_time": "2024-01-15T14:30:00Z",
"source": {
"agent_id": "snakemake-compute-node-01",
"workflow_id": "550e8400-e29b-41d4-a716-446655440000",
"hostname": "compute-node-01"
},
"correlation": {
"run_number": "12345"
},
"payload": {
"msg": "Job 5 completed successfully",
"level": "INFO",
"name": "align_reads",
"jobid": 5,
"wildcards": {"sample": "A"},
"threads": 4,
"resources": {"mem_mb": 16000}
}
}
To use JLabSWFFormatter:
snakemake --logger stomp \
--logger-stomp-formatter-class "snakemake_logger_plugin_stomp.formatters.JLabSWFFormatter"
The SWF_RUN_NUMBER environment variable can be set to populate the correlation.run_number field for tracking related workflow runs.
Custom Formatters
Create your own formatter by implementing the BaseFormatter interface:
# my_formatter.py
from snakemake_logger_plugin_stomp.formatters import BaseFormatter
from datetime import UTC, datetime
class MyFormatter(BaseFormatter):
def format(self, record, workflow_metadata):
return {
"ts": datetime.now(UTC).isoformat(),
"host": workflow_metadata.get("hostname"),
"event": str(getattr(record, "event", "UNKNOWN")),
"msg": record.getMessage() if hasattr(record, "getMessage") else str(record.msg),
"custom_field": "my_value"
}
Install your formatter in the same Python environment, then reference it:
snakemake --logger stomp \
--logger-stomp-formatter-class "my_formatter.MyFormatter"
Heartbeat Tuning
STOMP heartbeats keep the connection alive and detect network failures. Configure intervals in milliseconds:
logger:
stomp:
host: "broker.example.com"
heartbeat_send: 30000 # Client sends heartbeat every 30s (default: 10000ms = 10s)
heartbeat_receive: 30000 # Client expects broker heartbeat every 30s (default: 10000ms = 10s)
How it works:
heartbeat_send: How often the client sends heartbeat frames to the brokerheartbeat_receive: How often the client expects to receive heartbeat frames from the brokerSet to
0to disable heartbeats entirely
Do you need heartbeats for Snakemake?
YES, keep heartbeats enabled (default 10s) if you have:
Long-running jobs (hours) with infrequent events
Workflows with idle periods waiting on external resources
Unreliable networks (cloud, VPN, cross-datacenter connections)
Enterprise firewalls/proxies that drop idle TCP connections
Production workflows where silent connection loss is unacceptable
NO, you can disable heartbeats (set to 0) if you have:
Short workflows (< 30 minutes) with frequent job events
Localhost or same-datacenter broker (reliable network)
Development/testing environment
Jobs that start/finish frequently (events naturally keep connection alive)
Tuning guidance:
Default (10000ms = 10s): Good for most production workflows
Increase (30000-60000ms): For slow/congested networks to prevent false timeouts
Decrease (5000ms): For fast networks where you want quick failure detection
Disable (0): Only for local testing with reliable connections
Both send and receive values should typically be the same
Brokers may negotiate the actual interval based on their own configuration
Example: Disable for local testing
logger:
stomp:
host: "localhost"
heartbeat_send: 0
heartbeat_receive: 0
Queue vs Topic Destinations
STOMP supports both queue (point-to-point) and topic (publish-subscribe) semantics:
# Queue - one consumer receives each message
queue: "/queue/snakemake.events"
# Topic - all subscribers receive each message
queue: "/topic/snakemake.broadcast"
Use queues when you want load balancing across multiple consumers. Use topics when multiple systems need to receive all events (e.g., monitoring + archival + alerting).
Complete Production Example
A full production configuration combining SSL, event filtering, custom formatter, and heartbeat tuning:
# profiles/production-stomp/config.yaml
logger:
stomp:
host: "activemq.prod.internal"
port: 61614
use_ssl: true
cert_file: "/etc/certs/snakemake.crt"
key_file: "/etc/certs/snakemake.key"
queue: "/topic/snakemake.prod"
formatter_class: "snakemake_logger_plugin_stomp.formatters.JLabSWFFormatter"
include_events: "WORKFLOW_STARTED,JOB_STARTED,JOB_FINISHED,WORKFLOW_FINISHED,JOB_ERROR"
heartbeat_send: 30000
heartbeat_receive: 30000
Usage:
export STOMP_USER=workflow_user
export STOMP_PASSWORD=secure_password
export SWF_RUN_NUMBER=12345
snakemake --profile profiles/production-stomp
Troubleshooting
Connection Issues
If the plugin fails to connect:
Verify the broker is running and accessible:
telnet <host> <port>Check firewall rules allow the STOMP port
Confirm credentials are correct
Review broker logs for authentication errors
SSL Certificate Errors
If SSL connections fail:
Verify certificate paths are correct and readable
Ensure certificates match the broker hostname
Check certificate expiration dates
Try with
use_ssl: falsefirst to isolate SSL-specific issues
No Events Appearing
If events aren’t reaching your consumer:
Check event filtering settings (
include_events/exclude_events)Verify the correct destination (queue vs topic)
Confirm consumers are connected to the same queue/topic
Review Snakemake logs for plugin connection errors
Performance Considerations
For workflows with hundreds or thousands of jobs:
Use event filtering to reduce message volume
Consider using queues instead of topics if you don’t need broadcast
Increase heartbeat intervals if network latency is high
Monitor broker memory usage and adjust configuration as needed