===== Enable Resilient Data Transfer with RabbitMQ ===== Rabbit MQ supports secure and reliable server-to-server communication, making it the ideal utiliy for Data Core to employ for process data transfer. In the following example we consider a network architecture consisting of a source server and a destination server. - A Data Core Node on the source server subscribes to real-time tag-values from a SCADA system (via OPC or Modbus). - Data Core writes the tag-values to a local RabbitMQ queue. - RabbitMQ is configured to persist messages, meaning data can survive a server shutdown. - RabbitMQ pushes encrypted data (messages) downstream to the RabbitMQ queue on the destination server. - Data in transit is encrypted using TLS. - The destination RabbitMQ responds with an ack when the message is successfully enqueued. - The upstream RabbitMQ server will only dequeue the message on reciept of an ack. //This provides resiliance to network outage.// - App Store Connect on the destinations server reads the message from the queue and archives the tag value to IP Historian ===== Getting Started ===== Before we can commence with configuring Data Core, we must prepare the servers and Rabbit MQ for secure communication. ==== Destination Server Preparation ==== **1. Install RabbitMQ** Refer to [[data_core:rabbitmq|RabbitMQ]] ** 2. Configure Firewall Rules ** Firewall rules must be configured on the destination/downstream server to allow for incoming messages. Open Windows Defender Firewall and configure two new **Inbound Rules**: {{ :data_core:rabbitmq_03.png?600 |}} ^ Name ^ Description ^ Port ^ | AMQP | Advanced Message Queue Protocol - employed by RabbitMQ | 5672 | | AMQPS | Advanced Message Queue Protocol Secure - employed by RabbitMQ | 5671 | The AMQP rule should only be employed for testing and can be disabled when not required. We also recommending resticting access to the above rules by specifying the permitted "Remote IP Address" scope. ** 3. Create a RabbitMQ User ** A new user must be defined on the destination/downstream RabbitMQ server. The source will use this for authorization. * Open RabbitMQ Administration UI (http://localhost:15672/) * Select Admin > Users * Add User * Name: data_transfer_user * Password: * Allow read/write permissions on required queues ** 4. Create a Self-Signed CA Certificate ** The certificate is used for TLS encryption. Create folder: C:\Program Files\RabbitMQ Server\certs and add the following certificate files: * ca.crt - the certificate authority (must be trusted) * downstream-rabbitmq.crt - the server certificate * downstream-rabbitmq.key - the public key ** 5. Create RabbitMQ Configuration File ** Create file: %ProgramData%/RabbitMQ/rabbitmq.conf Add the following settings: listeners.ssl.default = 5671 ssl_options.cacertfile = C:/Program Files/RabbitMQ Server/certs/ca.crt ssl_options.certfile = C:/Program Files/RabbitMQ Server/certs/downstream-rabbitmq.crt ssl_options.keyfile = C:/Program Files/RabbitMQ Server/certs/downstream-rabbitmq.key ssl_options.verify = verify_none ssl_options.fail_if_no_peer_cert = false ** 6. Restart RabbitMQ windows service ** The above configuration will not take effect until the service is restarted. ** 7. Create RabbitMQ Queue ** ^ Name | data_core.tag_values | ^ Type | classic | ^ Durability | Durable | NB. The Data Core driver will attempt to create the queue if one is not already defined. ** 8. Define RabbitMQ Operator Policy (optional) ** A policy is a way to dynamically assign properties to queues that are otherwise immutable. We recommend a standard policy that limits how many messages can be buffered and thus avoid the risk of filling the hard-drive. When the limit is hit messages are dropped from the head of the queue. Select Admin > Policies > Operator Policies > Add /update a policy Complete the form as follows: ^ Name | Standard Flow | ^ Pattern | %%^data_core\..*$%% | ^ Apply To | Queues | ^ Priority | 2 | ^ Max Length (bytes) | 1000000000 | ==== Source Server Preparation ==== ** 1. Install RabbitMQ ** Refer to [[data_core:rabbitmq|RabbitMQ]] NB. Include the //optional// "RabbitMQ Shovel plugin" step. ** 2. Add Certificate Trust ** Create folder: C:\Program Files\RabbitMQ Server\certs and add the following certificate files: * ca.crt - the certificate authority ** 3. Create RabbitMQ Configuration File ** Create file: %ProgramData%/RabbitMQ/rabbitmq.conf Add the following settings: ssl_options.cacertfile = C:/Program Files/RabbitMQ Server/certs/ca.crt ** 4. Restart RabbitMQ windows service ** The above configuration will not take effect until the service is restarted. ** 5. Create RabbitMQ Queue ** Same as Destination Server Preparation - step 7. ** 6. Define RabbitMQ Operator Policy (optional) ** Same as Destination Server Preparation - step 8. ** 7. Configure RabbitMQ Shovel ** Select Admin > Shovel Management > Add a new shovel Complete the form as follows: {{ :data_core:rabbitmq_05.png?600 |}} ^ Name | Data Core Tag Values | ^ Source | AMQP 0.9.1 | ^ Source URI | amqp:/ / | ^ Source Queue | data_core.tag_values | ^ Prefetch count | | ^ Auto-delete | Never | ^ Destination| AMQP 0.9.1 | ^ Destination URI | amqps:/ /data_transfer_user:@ | ^ Destination Queue | data_core.tag_values | ^ Add forwarding headers | No | ^ Reconnect delay | | ^ Acknowledgment mode | On confirm | ==== Configure Data Core (Source Server) ==== On the source server, we'll need to configure the following components: - A RabbitMQ producer (writing items onto the queue) - A Data Stream (reading items from a data source to the queue) ==== Configure App Store Connect (Destination Server) ==== On the destination server we need to configure: - a RabbitMQ cosumer - an archive (e.g. IPHist) - a data stream reading messages from the queue to the archive. ===== Fast Data Flow ===== As well as resilient stream, it may be necessary to configure a fast (near-real-time) flow also. The difference is the resilient flow is responsible for guarenteed flow of history to the destination data archive, but the fast flow will only ever attempt to relay current values. In situations where a connection is unreliable, this means that dashboard displays will show the latest values as soon as connections are available, rather than be delayed with a catch-up lag. To implement a fast flow, a parallel stream is configured but with "ephemeral" settings. ==== Fast Data Flow Queue ==== ^ Name | data_core.tag_values.fast | ^ Type | classic | ^ Durability | Transient | ==== Fast Data Flow Operator Policy ==== ^ Name | Fast Flow | ^ Pattern | %%^data_core\..*\.fast$%% | ^ Apply To | Queues | ^ Priority | 2 | ^ Max Length (bytes) | 1000000000 | ^ Message TTL (ms)| 60000 | ==== Fast Data Flow Shovel ==== ^ Name | Fast Data Relay | ^ Source | AMQP 0.9.1 | ^ Source URI | amqp:/ / | ^ Source Queue | data_core.tag_values.fast | ^ Prefetch count | | ^ Auto-delete | Never | ^ Destination| AMQP 0.9.1 | ^ Destination URI | amqps:/ /data_transfer_user:@ | ^ Destination Queue | data_core.tag_values.fast | ^ Add forwarding headers | No | ^ Reconnect delay (s) | 15 | ^ Acknowledgment mode | No ack|