/
NUTRINTG (6) File Sender

NUTRINTG (6) File Sender

Short Summary

Input

message from file-zipper-to-file-sender SQS queue pointing to files on s3

Input

message from file-zipper-to-file-sender SQS queue pointing to files on s3

Output

  • email notification about the result

  • files on brandOrgCode-specific Salesforce SFTP

Region-awareness

region-independent instance with region-specific mapping in Mongo DB

Scalability

possible

Metadata Collections

  • cdpToSfmc_supportedRegions

  • cdpToSfmcFileSender_processedDeltas

  • cdpToSfmcFileSender_sentFiles

Flow Chart

  1. Try to fetch a message from the inbound queue (aws.sqs.fifo-queue-url) every N seconds. On production it is determined by process.fixed-rate property and set to 10000 milliseconds. If there is no message, just try again in 10000 milliseconds. There is an also an initial delay specified by process.initial-delay property set on production to 60000 milliseconds. This is implemented in order to allow brand org code upload flows to be created. Message example is below.

    { "region" : "amer_usa1", "deltaDate" : "20211001", "fileSets" : [ { "brandOrgCode" : "MJNDEV1", "filePaths": [ "amer_usa/20211001/MJNDEV1/amer_usa1_MJNDEV1_CDP_Extract_20211001.zip" ] }, { "brandOrgCode" : "MJNDEV2", "filePaths": [ "amer_usa/20211001/MJNDEV2/amer_usa1_MJNDEV2_CDP_Extract_20211001.zip" ] } ] }
  2. Check if there is at least one file in a message. If there is none, message is clearly malformed, so send an error email and finish.

  3. Calculate if delta date in the message is the next expected according to the following algorithm.

    • Try to retrieve the latest already sent delta from cdpToSfmcFileSender_processedDeltas collection. Retrieval is done via sorting by createdDate. This is an example of a document.

      { "_id" : ObjectId("61276062e9d552087ff86224"), "region" : "amer_usa1", "deltaDate" : "20211001", "createdDate" : ISODate("2021-10-01T00:00:00.000Z") }
    • If there are no documents, then delta date in the message is considered to be the next expected.

    • If there is a document, add the N days to deltaDate. The number of days is determined by process.delta-interval-in-days and is supposed to be set to 1. If this delta date does not match the delta date from the message, send an error email and finish.

  4. Retrieve supported brand org codes from internal SFTP connection manager (see Brand Org Code Update Flow Chart). If there is no such region, send an error email and finish. This is an example of a document.

  5. Split into as many internal messages as files in the queue message.

  6. For each internal message.

    1. Determine if the file was already sent to outbound SFTP based on metadata from cdpToSfmcFileSender_sentFiles collection.

    2. If it is already sent, create a success internal message.

    3. If it is not sent, submit the file to the appropriate upload flow based on region and brand org code. If there is no such flow, create an error internal message.

    4. Send the file to outbound SFTP, save metadata into cdpToSfmcFileSender_sentFiles collection and create an internal message based on the result of operation. Document example is presented below.

      { "_id" : ObjectId("6149606a985b080cc6b60622"), "region" : "amer_usa1", "deltaDate" : "20211001", "brandOrgCode" : "MJNDEV1", "sourcePath" : "amer_usa1/20211001/MJNDEV1/amer_usa1_MJNDEV1_CDP_Extract_20211001.zip", "destinationPath" : "Import/amer_usa1_MJNDEV1_CDP_Extract_20211001.zip", "createdDate" : ISODate("2021-10-01T00:00:00.000Z") }
  7. Aggregate all internal messages into a single one again. If there are any errors, send an error email. If there are no errors, create a new metadata document in cdpToSfmcFileSender_processedDeltas collection and send a success email.

Brand Org Code Update Flow Chart

 

  1. This is an independent scheduled process that repeats every N seconds. On production it is determined by process.upload-flow-update-delay property and set to 60000 milliseconds. Firstly, retrieve all regions with brand org code-specific SFTP connection details from cdpToSfmc_supportedRegions collection. Decrypt SFTP connection details if some properties are encrypted. This is an example of a document.

  2. If there are any removed brand org codes - stop and unregister upload flows for them.

  3. If there are any new brand org codes - register and start upload flows for them using {region}_{brandOrgCode}_uploadFlow as a key. If any brand org code exists, but it’s value of internal Java implementation of hashCode does not match a new hashCode value - unregister an old flow and register a new one with the same key. In this way, flows can be updated.

  4. If there is any error in the process, it will be reported in logs and whole process will start again after predefined delay. Observe logs and fix the issue as soon as possible because it may block update of other flows.

  5. If there are no errors, success will be reported in logs and whole process will start again after predefined delay.

Support Tips and Notes

  1. File sender instance is region-independent. It uses region-specific documents from cdpToSfmc_supportedRegions collection though. Properties can be accessed under https://bitbucket.org/rbdigital/spring-boot-cdp-to-sfmc-integration. Production logs can be observed and downloaded from https://kubernetes-dashboard-production.frankfurt.rbdigitalcloud.com under namespace cdp-to-sfmc-integration. For access, the platform team has to be contacted.

  2. Logs do not have a report on progress of file sending. It may take even several hours for a zip with size of several gigabytes. If there are no errors in logs, the process should be running.

  3. Error emails on production are currently sent to cdp.middleware@rb.com mailbox. Try to check it and read what is an error. Errors are usually self-explanatory, describing not sent files or so.

  4. In order to re-process a message we have to wait for default visibility timeout which is set in Terraform to usually 12 hours. Otherwise, we have to manually create an appropriate inbound message, remove the old message from the inbound queue and submit a new one.

  5. When message processing fails because of an exception, it is returned to the inbound queue. When re-submitting a message, pay attention to deduplication id and message group id. Message group id allows to process messages for the same region in order as they are submitted. It is determined by aws.sqs.outbound-fifo-message-group-id property. Deduplication id discards the same messages in the standard SQS timespan of 5 minutes. It is usually created in code based on:

    It can be whatever when re-submitting manually.

  6. The correctness of the region-specific document in cdpToSfmc_supportedRegions collection is crucial. SFTP connection details are encrypted using an appropriate public key for an environment. Any property can be encrypted, but generally we keep more sensitive data encrypted, e.g. users and passwords. To encrypt/decrypt you may use https://bitbucket.org/rbdigital/cdd-cryptography-helper or ask CDD team for assistance.

  7. IMPORTANT! Most of the times connection to SFMC SFTPs is established based on SSH. There is only one key pair in use, so in order to use it specify "base64EncodedPrivateKeyPropertyName": "sftp.commonSfmcBase64EncodedPrivateKey" property instead of password. The key itself is stored in AWS. It can be updated or new keys can be added via SOPS tool in yaml files in infra/secrets of https://bitbucket.org/rbdigital/spring-boot-cdp-to-sfmc-integration .

  8. IMPORTANT! Make sure not to update a region-specific document in cdpToSfmc_supportedRegions while there is a sending process running for this region. This may cause unpredictable behavior.