NUTRINTG (4) File Processor
Short Summary
Input | message from file-decrypter-to-file-processor SQS queue pointing to files on s3 |
---|---|
Output |
|
Region-awareness | region-independent instance with region-specific mapping in Mongo DB |
Scalability | possible |
Metadata Collections |
|
Flow Chart
Try to fetch a message from the inbound queue (
aws.sqs.inbound-fifo-queue-url
) every N seconds. On production it is determined byprocess.fixed-delay
property and set to 10000 milliseconds. If there is no message, just try again in 10000 milliseconds. Message example is below.{ "region" : "amer_usa1", "deltaDate" : "20211001", "filePaths" : [ "amer_usa/20211001/rb_amer_usa_order_20211001120000.dat", "amer_usa/20211001/rb_amer_usa_customer_20211001120000.dat" ] }
If there is a message, calculate if delta date in the message is the next expected according to the following algorithm.
Try to retrieve the latest already processed delta from cdpToSfmcFileProcessor_processedDeltas collection. Retrieval is done via sorting by createdDate. This is an example of a document.
{ "_id" : ObjectId("61276062e9d552087ff86224"), "region" : "amer_usa1", "deltaDate" : "20211001", "createdDate" : ISODate("2021-10-01T00:00:00.000Z") }
If there are no documents, then delta date in the message is considered to be the next expected.
If there is a document, add the N days to deltaDate. The number of days is determined by
process.delta-interval-in-days
and is supposed to be set to1
. If this delta date does not match the delta date from the message, send an error email and finish.
Retrieve all supported brand org codes and entities for the given region from cdpToSfmc_supportedRegions collection. If there is no region in the database, send an error email and finish. This is an example of a document.
{ "region": "amer_usa1", "archiveNameTemplate": "{cdpRegion}_{brandOrgCode}_CDP_Extract_{deltaDate}.zip", "supportedFiles": [ { "name": "order", "headersInOrder": [ "BRAND_ORG_CODE", "NUMBER", "DISCOUNT_CODE" ], "separatorFieldPosition": NumberInt(0), "sourceFileSeparator": "|", "targetFileSeparator": "," }, { "name": "customer", "headersInOrder": [ "NAME", "LAST_NAME", "BRAND_ORG_CODE" ], "separatorFieldPosition": NumberInt(2), "sourceFileSeparator": "|", "targetFileSeparator": "," } ], "supportedBrandOrgCodes": [ { "brandOrgCode": "MJNDEV1", "user": "ENC(Mem7m5az7wp2wlaEzp7jEQ96hXfS7EVLXDqipnuTXcLUq9DHjIfMG9ChNoFUwmj498riWe3TXxxAUN7QGRFM97s3KrRi01oX2S3Jd3+dZXxw5PKsCB0RE0hWJhMChgqkhFDhnYC+NjbZA0Wt5KzKfuJwTlwQLdncztnMm6yD63C5jzKsiMuqzI2qWXD5+RKEuQSv6LQ+6MNVTfe+pIhcA4PStR4KE4oDy++bMKp48Od+L6Hez5f8QGEyrVcIc+Eu7YuxbU7t51WMavOQbNJ0n+HwqeE9gpXGxoNsxN/oLWFXYrgZEgythIzo/zh8WAdGa+TQOBHLU5ZJ8hB12p+/aPHiJMI1OAtxf74C4NyVXOHKN5XIZPD9q/DcX4Gx7QxE6tPMWEUJR/q00vxchWZnyj2zOgvulrPbmXHJZySLad1KZbd3mlJBWsBcqC3IlPHB3ezzI4C04van1TAcs2I7ju1mSIhoBVgvVuYflmu7yJ6QN2uMDLelJTEc4nHF2WqQIEIRa930/9szBfIWNOa1JIMgTur172T888Mj5TkLY+0KvihLLLI69BVRBSFcIA/atcXSigxaVffs19ZUUDU+ZL3aRUW0Mpyu2NLvlTGpG7WLR3uHEt8wXd6F4X17lNWRZ2WUSxQGfkm4Ocy0Jys5ddIpAs14yZebTVGfL8Bc7sw=)", "password": "ENC(FBQ2ohzprGoHW4XePjgucHi1K/2ZvMXhSp0ptBS38U69DwJxdrWZzRcGF+rMxlGH3sESwCStTax8Jmgz/DE7NPURve8QgDHxUWqcowJMqcShd14ZF2a9e6lDB0Iy6nsC6YZrF41IYZx1yWmnsqo9HUOCULs7GaVdBDNeS+oXCRydqP97WE9omFhdYlvvVpV59mpE85uPsKhUUjhCpwxD1neCd6FusY2hvuot03S/hHbPG/6QXR8fHydrP1g4scBl+c24Sz4dW5N+BawoRXYXVTWAQOJy8RcLnftijxxhR4QOWLp2otMcqKiBCdrDaTBr8VgjUbe8UmPl3my/7WiqAgcwWnXJ+YzgRFXyFjIdgGWeV2ak9ih51KR8jxeOTpBpDQqNUx3qbckdlQs28ib5clwKljBTGBUHH0Rtgn8LMkNk3nSjl/ectXAjbQqxNdLEC//piGrKQQrI0Vr4xCf9YG9bpBy51+hdF6MsFi1SMNI2TtTKacnLgtUIl717PRyxZ1nIHrWPe3sP78ygt5KswNXp+yfMGVepOcSZERT8jyHCbYrec+f7xbzi03mc058awVFnIKZuEIg5Jtat1htTupTS3yGCUVss/cOr081SlvNnzFFu+4Wy5DSDZ+uEv5ys2cgGFnSdge44r4KpHH3VBnarOmD/CAsxkyYDkcvcFPc=)", "host": "localhost", "port": "2222", "remoteDirectory": "sftp/test" }, { "brandOrgCode": "MJNDEV2", "user": "ENC(Mem7m5az7wp2wlaEzp7jEQ96hXfS7EVLXDqipnuTXcLUq9DHjIfMG9ChNoFUwmj498riWe3TXxxAUN7QGRFM97s3KrRi01oX2S3Jd3+dZXxw5PKsCB0RE0hWJhMChgqkhFDhnYC+NjbZA0Wt5KzKfuJwTlwQLdncztnMm6yD63C5jzKsiMuqzI2qWXD5+RKEuQSv6LQ+6MNVTfe+pIhcA4PStR4KE4oDy++bMKp48Od+L6Hez5f8QGEyrVcIc+Eu7YuxbU7t51WMavOQbNJ0n+HwqeE9gpXGxoNsxN/oLWFXYrgZEgythIzo/zh8WAdGa+TQOBHLU5ZJ8hB12p+/aPHiJMI1OAtxf74C4NyVXOHKN5XIZPD9q/DcX4Gx7QxE6tPMWEUJR/q00vxchWZnyj2zOgvulrPbmXHJZySLad1KZbd3mlJBWsBcqC3IlPHB3ezzI4C04van1TAcs2I7ju1mSIhoBVgvVuYflmu7yJ6QN2uMDLelJTEc4nHF2WqQIEIRa930/9szBfIWNOa1JIMgTur172T888Mj5TkLY+0KvihLLLI69BVRBSFcIA/atcXSigxaVffs19ZUUDU+ZL3aRUW0Mpyu2NLvlTGpG7WLR3uHEt8wXd6F4X17lNWRZ2WUSxQGfkm4Ocy0Jys5ddIpAs14yZebTVGfL8Bc7sw=)", "base64EncodedPrivateKeyPropertyName": "sftp.commonSfmcBase64EncodedPrivateKey", "host": "localhost", "port": "2223", "remoteDirectory": "sftp/test" } ] }
Retrieve all entities from file names in the queue message based on
file.baseNamePattern
property. It is set on production torb_(amer(_usa)?|eur|sea)_(?<entity>.*)_[0-9]{14}
. Check them against the list of supported entities for a region retrieved on the previous steps. If entities do not match, send an error email and finish.Fetch metadata about already processed and split files for given region and delta date from cdpToSfmcFileProcessor_processedFiles collection. This come in handy when the message was previously in process and returned to the queue (i.e. some file have been processed and split but some have failed). Therefore, we do not process and split already processed files. Please note that S3 is not checked, so manually deleted split files from S3 will cause issues in File Zipper.
Split into as many internal messages as files in the queue message.
Split files by brand org codes and write split files to S3 in multiple threads. Number of threads is determined by
process.processing-thread-pool-size
property. For example, if it set to 8, but there are 15 files, processing will start immediately for 8 files and 7 other will be picked up by free threads later. Headers for each processed file are verified against the list of headers from the database. If they do not match, file splitting finishes with an error. If there is no data in the file for any supported brand org code, we create an empty split file with headers. If there is an unsupported brand org code in any file, file splitting of this file finishes with an error. Result files are written to S3 into the directory determined by the path of encrypted files plus brand org code. The date part of file name is truncated. For example, amer_usa/20211001/rb_amer_usa_order_20211001120000.dat file will be split into amer_usa/20211001/MJNDEV1/rb_amer_usa_order.dat and amer_usa/20211001/MJNDEV2/rb_amer_usa_order.dat. In this step we also create metadata for processed and split files in cdpToSfmcFileProcessor_processedFiles collection. Example of a document is below.Aggregate all internal messages into a single one again. If there are any errors, send an error email. If there are no errors, send a message to outbound queue (
aws.sqs.outbound-fifo-queue-url
), create a new metadata document in cdpToSfmcFileProcessor_processedDeltas collection and send a success email. Example of a message is below.
Support Tips and Notes
File processor instance is region-independent. It uses region-specific documents from cdpToSfmc_supportedRegions collection though. Properties can be accessed under https://bitbucket.org/rbdigital/spring-boot-cdp-to-sfmc-integration. Production logs can be observed and downloaded from https://kubernetes-dashboard-production.frankfurt.rbdigitalcloud.com under namespace cdp-to-sfmc-integration. For access, the platform team has to be contacted.
Error emails on production are currently sent to cdp.middleware@rb.com mailbox. Try to check it and read what is an error. Errors are usually self-explanatory, describing not processed files or so.
In order to re-process a message we have to wait for default visibility timeout which is set in Terraform to usually 12 hours. Otherwise, we have to manually create an appropriate inbound message, remove the old message from the inbound queue and submit a new one.
When message processing fails because of an exception, it is returned to the inbound queue. When re-submitting a message, pay attention to deduplication id and message group id. Message group id allows to process messages for the same region in order as they are submitted. It is determined by
aws.sqs.outbound-fifo-message-group-id
property. Deduplication id discards the same messages in the standard SQS timespan of 5 minutes. It is usually created in code based on:It can be whatever when re-submitting manually.
The correctness of the region-specific document in cdpToSfmc_supportedRegions collection is crucial. Headers have to be specified in order for each entity, entities order is not important. Separators and position of brand org code column (counting from 0) have to be specified correctly for each file too. A document is fetched before every processor run, so the changes will be effective immediately.