Snakemake logger plugin: stomp

https://img.shields.io/badge/repository-github-blue?color=%23022c22 https://img.shields.io/badge/author-Anil%20Panta%20%3Canilpanta2%40gmail.org%3E-purple?color=%23064e3b PyPI - Version PyPI - License

Warning

This plugin is not maintained and reviewed by the official Snakemake organization.

Snakemake Logger Plugin for STOMP

Stream Snakemake workflow events to STOMP message brokers (ActiveMQ, RabbitMQ, Apollo) in real-time for monitoring, audit trails, and event-driven system integration.

Key Features

  • Real-time event streaming to STOMP-compatible message brokers

  • SSL/TLS encryption for secure production deployments

  • Flexible formatters - default flat JSON or JLab Scientific Workflow schema

  • Event filtering - include/exclude specific event types to reduce noise

  • Custom formatters - plugin your own formatter classes

  • Environment variable support for credentials and sensitive configuration

Installation

pip install snakemake-logger-plugin-stomp

Quick Start

Command Line

snakemake --logger stomp \
  --logger-stomp-host localhost \
  --logger-stomp-port 61613 \
  --logger-stomp-user admin \
  --logger-stomp-password admin \
  --logger-stomp-queue /queue/snakemake.events

Profile Configuration

# profiles/stomp/config.yaml
logger:
  stomp:
    host: "activemq.example.com"
    port: 61613
    user: "${STOMP_USER}"
    password: "${STOMP_PASSWORD}"
    queue: "/topic/snakemake.events"

Then run:

snakemake --profile profiles/stomp

Environment Variables

Secure credentials using environment variables:

export SNAKEMAKE_LOGGER_STOMP_HOST=activemq.example.com
export SNAKEMAKE_LOGGER_STOMP_USER=admin
export SNAKEMAKE_LOGGER_STOMP_PASSWORD=secret
snakemake --logger stomp

Use Cases

  • External monitoring - Send workflow events to centralized monitoring systems

  • Audit trails - Create permanent records of workflow execution in message queues

  • Event-driven automation - Trigger downstream processes based on workflow events

  • Team dashboards - Build real-time dashboards showing workflow status across teams

  • Integration - Connect Snakemake to enterprise message bus architectures

Install this plugin by installing it with pip or mamba directly, e.g.:

pip install snakemake-logger-plugin-stomp

Or, if you are using pixi, add the plugin to your pixi.toml. Be careful to put it under the right dependency type based on the plugin’s availability, e.g.:

snakemake-logger-plugin-stomp = "*"

In order to use the plugin, run Snakemake (>=9.0) with the corresponding value for the logger flag:

snakemake --logger stomp ...

with ... being any additional arguments you want to use.

The logger plugin has the following settings (which can be passed via command line, the workflow or environment variables, if provided in the respective columns):

Settings

CLI argument

Description

Default

Choices

Required

Type

--logger-stomp-host VALUE

STOMP broker hostname

'localhost'

--logger-stomp-port VALUE

STOMP broker port

61613

--logger-stomp-user VALUE

STOMP broker username for authentication

None

--logger-stomp-password VALUE

STOMP broker password for authentication

None

--logger-stomp-queue VALUE

STOMP destination (queue or topic path, e.g., /queue/name or /topic/name)

'/queue/snakemake.events'

--logger-stomp-formatter-class VALUE

Full Python path to formatter class (e.g., module.submodule.ClassName)

'snakemake_logger_plugin_stomp.formatters.DefaultJSONFormatter'

--logger-stomp-include-events VALUE

Comma-separated list of events to include (e.g., ‘WORKFLOW_STARTED,JOB_FINISHED’)

None

--logger-stomp-exclude-events VALUE

Comma-separated list of events to exclude (e.g., ‘DEBUG,PROGRESS’)

None

--logger-stomp-use-ssl VALUE

Enable SSL/TLS encryption for broker connection

False

--logger-stomp-cert-file VALUE

Path to SSL certificate file for client authentication

None

--logger-stomp-key-file VALUE

Path to SSL private key file for client authentication

None

--logger-stomp-heartbeat-send VALUE

STOMP heartbeat send interval in milliseconds (0 to disable)

10000

--logger-stomp-heartbeat-receive VALUE

STOMP heartbeat receive interval in milliseconds (0 to disable)

10000

--logger-stomp-fail-on-connection-error VALUE

Fail workflow if initial connection to broker fails (strict audit mode)

False

Advanced Configuration

SSL/TLS Encryption

For production brokers requiring encrypted connections:

Profile configuration:

# profiles/production/config.yaml
logger:
  stomp:
    host: "secure-broker.example.com"
    port: 61614
    use_ssl: true
    cert_file: "/path/to/client.crt"
    key_file: "/path/to/client.key"
    queue: "/topic/snakemake.secure"

Command line:

snakemake --logger stomp \
  --logger-stomp-host secure-broker.example.com \
  --logger-stomp-port 61614 \
  --logger-stomp-use-ssl \
  --logger-stomp-cert-file /path/to/client.crt \
  --logger-stomp-key-file /path/to/client.key

Event Filtering

Control which events generate messages to reduce noise or focus on specific workflow stages.

Whitelist Approach

Only send specific events:

snakemake --logger stomp \
  --logger-stomp-include-events "WORKFLOW_STARTED,JOB_STARTED,JOB_FINISHED,WORKFLOW_FINISHED"
Blacklist Approach

Exclude noisy events:

snakemake --logger stomp \
  --logger-stomp-exclude-events "DEBUG,PROGRESS,RESOURCES_INFO"

Available events: WORKFLOW_STARTED, WORKFLOW_FINISHED, JOB_STARTED, JOB_FINISHED, JOB_ERROR, DEBUG, PROGRESS, RESOURCES_INFO, and more.

Built-in Formatters

DefaultJSONFormatter (default)

Produces flat, easy-to-parse JSON:

{
  "timestamp": "2024-01-15T14:30:00Z",
  "hostname": "compute-node-01",
  "workflow_id": "550e8400-e29b-41d4-a716-446655440000",
  "event_type": "JOB_FINISHED",
  "level": "INFO",
  "message": "Job 5 completed successfully"
}
JLabSWFFormatter

Nested schema compliant with JLab Scientific Workflow Facility standards:

{
  "schema_ref": "org.jlab.swf.event",
  "schema_version": "1.0.0",
  "event_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "event_type": "SNAKEMAKE_JOB_FINISHED",
  "event_time": "2024-01-15T14:30:00Z",
  "source": {
    "agent_id": "snakemake-compute-node-01",
    "workflow_id": "550e8400-e29b-41d4-a716-446655440000",
    "hostname": "compute-node-01"
  },
  "correlation": {
    "run_number": "12345"
  },
  "payload": {
    "msg": "Job 5 completed successfully",
    "level": "INFO",
    "name": "align_reads",
    "jobid": 5,
    "wildcards": {"sample": "A"},
    "threads": 4,
    "resources": {"mem_mb": 16000}
  }
}

To use JLabSWFFormatter:

snakemake --logger stomp \
  --logger-stomp-formatter-class "snakemake_logger_plugin_stomp.formatters.JLabSWFFormatter"

The SWF_RUN_NUMBER environment variable can be set to populate the correlation.run_number field for tracking related workflow runs.

Custom Formatters

Create your own formatter by implementing the BaseFormatter interface:

# my_formatter.py
from snakemake_logger_plugin_stomp.formatters import BaseFormatter
from datetime import UTC, datetime

class MyFormatter(BaseFormatter):
    def format(self, record, workflow_metadata):
        return {
            "ts": datetime.now(UTC).isoformat(),
            "host": workflow_metadata.get("hostname"),
            "event": str(getattr(record, "event", "UNKNOWN")),
            "msg": record.getMessage() if hasattr(record, "getMessage") else str(record.msg),
            "custom_field": "my_value"
        }

Install your formatter in the same Python environment, then reference it:

snakemake --logger stomp \
  --logger-stomp-formatter-class "my_formatter.MyFormatter"

Heartbeat Tuning

STOMP heartbeats keep the connection alive and detect network failures. Configure intervals in milliseconds:

logger:
  stomp:
    host: "broker.example.com"
    heartbeat_send: 30000    # Client sends heartbeat every 30s (default: 10000ms = 10s)
    heartbeat_receive: 30000 # Client expects broker heartbeat every 30s (default: 10000ms = 10s)

How it works:

  • heartbeat_send: How often the client sends heartbeat frames to the broker

  • heartbeat_receive: How often the client expects to receive heartbeat frames from the broker

  • Set to 0 to disable heartbeats entirely

Do you need heartbeats for Snakemake?

YES, keep heartbeats enabled (default 10s) if you have:

  • Long-running jobs (hours) with infrequent events

  • Workflows with idle periods waiting on external resources

  • Unreliable networks (cloud, VPN, cross-datacenter connections)

  • Enterprise firewalls/proxies that drop idle TCP connections

  • Production workflows where silent connection loss is unacceptable

NO, you can disable heartbeats (set to 0) if you have:

  • Short workflows (< 30 minutes) with frequent job events

  • Localhost or same-datacenter broker (reliable network)

  • Development/testing environment

  • Jobs that start/finish frequently (events naturally keep connection alive)

Tuning guidance:

  • Default (10000ms = 10s): Good for most production workflows

  • Increase (30000-60000ms): For slow/congested networks to prevent false timeouts

  • Decrease (5000ms): For fast networks where you want quick failure detection

  • Disable (0): Only for local testing with reliable connections

  • Both send and receive values should typically be the same

  • Brokers may negotiate the actual interval based on their own configuration

Example: Disable for local testing

logger:
  stomp:
    host: "localhost"
    heartbeat_send: 0
    heartbeat_receive: 0

Queue vs Topic Destinations

STOMP supports both queue (point-to-point) and topic (publish-subscribe) semantics:

# Queue - one consumer receives each message
queue: "/queue/snakemake.events"

# Topic - all subscribers receive each message
queue: "/topic/snakemake.broadcast"

Use queues when you want load balancing across multiple consumers. Use topics when multiple systems need to receive all events (e.g., monitoring + archival + alerting).

Complete Production Example

A full production configuration combining SSL, event filtering, custom formatter, and heartbeat tuning:

# profiles/production-stomp/config.yaml
logger:
  stomp:
    host: "activemq.prod.internal"
    port: 61614
    use_ssl: true
    cert_file: "/etc/certs/snakemake.crt"
    key_file: "/etc/certs/snakemake.key"
    queue: "/topic/snakemake.prod"
    formatter_class: "snakemake_logger_plugin_stomp.formatters.JLabSWFFormatter"
    include_events: "WORKFLOW_STARTED,JOB_STARTED,JOB_FINISHED,WORKFLOW_FINISHED,JOB_ERROR"
    heartbeat_send: 30000
    heartbeat_receive: 30000

Usage:

export STOMP_USER=workflow_user
export STOMP_PASSWORD=secure_password
export SWF_RUN_NUMBER=12345
snakemake --profile profiles/production-stomp

Troubleshooting

Connection Issues

If the plugin fails to connect:

  1. Verify the broker is running and accessible: telnet <host> <port>

  2. Check firewall rules allow the STOMP port

  3. Confirm credentials are correct

  4. Review broker logs for authentication errors

SSL Certificate Errors

If SSL connections fail:

  1. Verify certificate paths are correct and readable

  2. Ensure certificates match the broker hostname

  3. Check certificate expiration dates

  4. Try with use_ssl: false first to isolate SSL-specific issues

No Events Appearing

If events aren’t reaching your consumer:

  1. Check event filtering settings (include_events/exclude_events)

  2. Verify the correct destination (queue vs topic)

  3. Confirm consumers are connected to the same queue/topic

  4. Review Snakemake logs for plugin connection errors

Performance Considerations

For workflows with hundreds or thousands of jobs:

  • Use event filtering to reduce message volume

  • Consider using queues instead of topics if you don’t need broadcast

  • Increase heartbeat intervals if network latency is high

  • Monitor broker memory usage and adjust configuration as needed