User Tools

Site Tools


datacore:enable_resilient_data_transfer_with_rabbitmq

Enable Resilient Data Transfer with RabbitMQ

Rabbit MQ supports secure and reliable server-to-server communication, making it the ideal utiliy for Data Core to employ for process data transfer.

In the following example we consider a network architecture consisting of a source server and a destination server.

  1. A Data Core Node on the source server subscribes to real-time tag-values from a SCADA system (via OPC or Modbus).
  2. Data Core writes the tag-values to a local RabbitMQ queue.
    1. RabbitMQ is configured to persist messages, meaning data can survive a server shutdown.
  3. RabbitMQ pushes encrypted data (messages) downstream to the RabbitMQ queue on the destination server.
    1. Data in transit is encrypted using TLS.
  4. The destination RabbitMQ responds with an ack when the message is successfully enqueued.
    1. The upstream RabbitMQ server will only dequeue the message on reciept of an ack. This provides resiliance to network outage.
    2. App Store Connect on the destinations server reads the message from the queue and archives the tag value to IP Historian

Getting Started

Before we can commence with configuring Data Core, we must prepare the servers and Rabbit MQ for secure communication.

Destination Server Preparation

1. Install RabbitMQ

Refer to RabbitMQ

2. Configure Firewall Rules

Firewall rules must be configured on the destination/downstream server to allow for incoming messages.

Open Windows Defender Firewall and configure two new Inbound Rules:

Name Description Port
AMQP Advanced Message Queue Protocol - employed by RabbitMQ 5672
AMQPS Advanced Message Queue Protocol Secure - employed by RabbitMQ 5671

The AMQP rule should only be employed for testing and can be disabled when not required. We also recommending resticting access to the above rules by specifying the permitted “Remote IP Address” scope.

3. Create a RabbitMQ User

A new user must be defined on the destination/downstream RabbitMQ server. The source will use this for authorization.

  • Open RabbitMQ Administration UI (http://localhost:15672/)
  • Select Admin > Users
  • Add User
    • Name: data_transfer_user
    • Password: <you decide>
    • Allow read/write permissions on required queues

4. Create a Self-Signed CA Certificate

The certificate is used for TLS encryption.

Create folder:

C:\Program Files\RabbitMQ Server\certs

and add the following certificate files:

  • ca.crt - the certificate authority (must be trusted)
  • downstream-rabbitmq.crt - the server certificate
  • downstream-rabbitmq.key - the public key

5. Create RabbitMQ Configuration File

Create file:

%ProgramData%/RabbitMQ/rabbitmq.conf

Add the following settings:

listeners.ssl.default = 5671
ssl_options.cacertfile = C:/Program Files/RabbitMQ Server/certs/ca.crt
ssl_options.certfile   = C:/Program Files/RabbitMQ Server/certs/downstream-rabbitmq.crt
ssl_options.keyfile    = C:/Program Files/RabbitMQ Server/certs/downstream-rabbitmq.key
ssl_options.verify     = verify_none
ssl_options.fail_if_no_peer_cert = false

6. Restart RabbitMQ windows service

The above configuration will not take effect until the service is restarted.

7. Create RabbitMQ Queue

Name data_core.tag_values
Type classic
Durability Durable

NB. The Data Core driver will attempt to create the queue if one is not already defined.

8. Define RabbitMQ Operator Policy (optional)

A policy is a way to dynamically assign properties to queues that are otherwise immutable. We recommend a standard policy that limits how many messages can be buffered and thus avoid the risk of filling the hard-drive. When the limit is hit messages are dropped from the head of the queue.

Select Admin > Policies > Operator Policies > Add /update a policy

Complete the form as follows:

Name Standard Flow
Pattern ^data_core\..*$
Apply To Queues
Priority 2
Max Length (bytes) 1000000000

Source Server Preparation

1. Install RabbitMQ

Refer to RabbitMQ

NB. Include the optional “RabbitMQ Shovel plugin” step.

2. Add Certificate Trust

Create folder:

 C:\Program Files\RabbitMQ Server\certs
 

and add the following certificate files:

  • ca.crt - the certificate authority

3. Create RabbitMQ Configuration File

Create file:

%ProgramData%/RabbitMQ/rabbitmq.conf

Add the following settings:

ssl_options.cacertfile = C:/Program Files/RabbitMQ Server/certs/ca.crt

4. Restart RabbitMQ windows service

The above configuration will not take effect until the service is restarted.

5. Create RabbitMQ Queue

Same as Destination Server Preparation - step 7.

6. Define RabbitMQ Operator Policy (optional)

Same as Destination Server Preparation - step 8.

7. Configure RabbitMQ Shovel

Select Admin > Shovel Management > Add a new shovel

Complete the form as follows:

Name Data Core Tag Values
Source AMQP 0.9.1
Source URI amqp:/ /
Source Queue data_core.tag_values
Prefetch count
Auto-delete Never
Destination AMQP 0.9.1
Destination URI amqps:/ /data_transfer_user:<password>@<servername or ip>
Destination Queue data_core.tag_values
Add forwarding headers No
Reconnect delay
Acknowledgment mode On confirm

Configure Data Core (Source Server)

On the source server, we'll need to configure the following components:

  1. A RabbitMQ producer (writing items onto the queue)
  2. A Data Stream (reading items from a data source to the queue)

Configure App Store Connect (Destination Server)

On the destination server we need to configure:

  1. a RabbitMQ cosumer
  2. an archive (e.g. IPHist)
  3. a data stream reading messages from the queue to the archive.

Fast Data Flow

As well as resilient stream, it may be necessary to configure a fast (near-real-time) flow also. The difference is the resilient flow is responsible for guarenteed flow of history to the destination data archive, but the fast flow will only ever attempt to relay current values.

In situations where a connection is unreliable, this means that dashboard displays will show the latest values as soon as connections are available, rather than be delayed with a catch-up lag.

To implement a fast flow, a parallel stream is configured but with “ephemeral” settings.

Fast Data Flow Queue

Name data_core.tag_values.fast
Type classic
Durability Transient

Fast Data Flow Operator Policy

Name Fast Flow
Pattern ^data_core\..*\.fast$
Apply To Queues
Priority 2
Max Length (bytes) 1000000000
Message TTL (ms) 60000

Fast Data Flow Shovel

Name Fast Data Relay
Source AMQP 0.9.1
Source URI amqp:/ /
Source Queue data_core.tag_values.fast
Prefetch count
Auto-delete Never
Destination AMQP 0.9.1
Destination URI amqps:/ /data_transfer_user:<password>@<servername or ip>
Destination Queue data_core.tag_values.fast
Add forwarding headers No
Reconnect delay (s) 15
Acknowledgment mode No ack
datacore/enable_resilient_data_transfer_with_rabbitmq.txt · Last modified: 2023/08/26 10:42 by su