NUTRINTG (5) File Zipper
Short Summary
Input | message from file-processor-to-file-zipper SQS queue pointing to files on s3 |
---|---|
Output |
|
Region-awareness | region-independent instance with region-specific mapping in Mongo DB |
Scalability | possible |
Metadata Collections |
|
Flow Chart
Try to fetch a message from the inbound queue (
aws.sqs.inbound-fifo-queue-url
) every N seconds. On production it is determined byprocess.fixed-delay
property and set to 10000 milliseconds. If there is no message, just try again in 10000 milliseconds. Message example is below.{ "region" : "amer_usa1", "deltaDate" : "20211001", "fileSets" : [ { "brandOrgCode" : "MJNDEV1", "filePaths": [ "amer_usa/20211001/MJNDEV1/rb_amer_usa_order.dat", "amer_usa/20211001/MJNDEV1/rb_amer_usa_customer.dat" ] }, { "brandOrgCode" : "MJNDEV2", "filePaths": [ "amer_usa/20211001/MJNDEV2/rb_amer_usa_order.dat", "amer_usa/20211001/MJNDEV2/rb_amer_usa_customer.dat" ] } ] }
If there is a message, calculate if delta date in the message is the next expected according to the following algorithm.
Try to retrieve the latest already zipped delta from cdpToSfmcFileZipper_processedDeltas collection. Retrieval is done via sorting by createdDate. This is an example of a document.
{ "_id" : ObjectId("61276062e9d552087ff86224"), "region" : "amer_usa1", "deltaDate" : "20211001", "createdDate" : ISODate("2021-10-01T00:00:00.000Z") }
If there are no documents, then delta date in the message is considered to be the next expected.
If there is a document, add the N days to deltaDate. The number of days is determined by
process.delta-interval-in-days
and is supposed to be set to1
. If this delta date does not match the delta date from the message, send an error email and finish.
Retrieve archive name template for the given region from cdpToSfmc_supportedRegions collection. If there is no region in the database, send an error email and finish. This is an example of a document.
{ "region": "amer_usa1", "archiveNameTemplate": "{cdpRegion}_{brandOrgCode}_CDP_Extract_{deltaDate}.zip", "supportedFiles": [ { "name": "order", "headersInOrder": [ "BRAND_ORG_CODE", "NUMBER", "DISCOUNT_CODE" ], "separatorFieldPosition": NumberInt(0), "sourceFileSeparator": "|", "targetFileSeparator": "," }, { "name": "customer", "headersInOrder": [ "NAME", "LAST_NAME", "BRAND_ORG_CODE" ], "separatorFieldPosition": NumberInt(2), "sourceFileSeparator": "|", "targetFileSeparator": "," } ], "supportedBrandOrgCodes": [ { "brandOrgCode": "MJNDEV1", "user": "ENC(Mem7m5az7wp2wlaEzp7jEQ96hXfS7EVLXDqipnuTXcLUq9DHjIfMG9ChNoFUwmj498riWe3TXxxAUN7QGRFM97s3KrRi01oX2S3Jd3+dZXxw5PKsCB0RE0hWJhMChgqkhFDhnYC+NjbZA0Wt5KzKfuJwTlwQLdncztnMm6yD63C5jzKsiMuqzI2qWXD5+RKEuQSv6LQ+6MNVTfe+pIhcA4PStR4KE4oDy++bMKp48Od+L6Hez5f8QGEyrVcIc+Eu7YuxbU7t51WMavOQbNJ0n+HwqeE9gpXGxoNsxN/oLWFXYrgZEgythIzo/zh8WAdGa+TQOBHLU5ZJ8hB12p+/aPHiJMI1OAtxf74C4NyVXOHKN5XIZPD9q/DcX4Gx7QxE6tPMWEUJR/q00vxchWZnyj2zOgvulrPbmXHJZySLad1KZbd3mlJBWsBcqC3IlPHB3ezzI4C04van1TAcs2I7ju1mSIhoBVgvVuYflmu7yJ6QN2uMDLelJTEc4nHF2WqQIEIRa930/9szBfIWNOa1JIMgTur172T888Mj5TkLY+0KvihLLLI69BVRBSFcIA/atcXSigxaVffs19ZUUDU+ZL3aRUW0Mpyu2NLvlTGpG7WLR3uHEt8wXd6F4X17lNWRZ2WUSxQGfkm4Ocy0Jys5ddIpAs14yZebTVGfL8Bc7sw=)", "password": "ENC(FBQ2ohzprGoHW4XePjgucHi1K/2ZvMXhSp0ptBS38U69DwJxdrWZzRcGF+rMxlGH3sESwCStTax8Jmgz/DE7NPURve8QgDHxUWqcowJMqcShd14ZF2a9e6lDB0Iy6nsC6YZrF41IYZx1yWmnsqo9HUOCULs7GaVdBDNeS+oXCRydqP97WE9omFhdYlvvVpV59mpE85uPsKhUUjhCpwxD1neCd6FusY2hvuot03S/hHbPG/6QXR8fHydrP1g4scBl+c24Sz4dW5N+BawoRXYXVTWAQOJy8RcLnftijxxhR4QOWLp2otMcqKiBCdrDaTBr8VgjUbe8UmPl3my/7WiqAgcwWnXJ+YzgRFXyFjIdgGWeV2ak9ih51KR8jxeOTpBpDQqNUx3qbckdlQs28ib5clwKljBTGBUHH0Rtgn8LMkNk3nSjl/ectXAjbQqxNdLEC//piGrKQQrI0Vr4xCf9YG9bpBy51+hdF6MsFi1SMNI2TtTKacnLgtUIl717PRyxZ1nIHrWPe3sP78ygt5KswNXp+yfMGVepOcSZERT8jyHCbYrec+f7xbzi03mc058awVFnIKZuEIg5Jtat1htTupTS3yGCUVss/cOr081SlvNnzFFu+4Wy5DSDZ+uEv5ys2cgGFnSdge44r4KpHH3VBnarOmD/CAsxkyYDkcvcFPc=)", "host": "localhost", "port": "2222", "remoteDirectory": "sftp/test" }, { "brandOrgCode": "MJNDEV2", "user": "ENC(Mem7m5az7wp2wlaEzp7jEQ96hXfS7EVLXDqipnuTXcLUq9DHjIfMG9ChNoFUwmj498riWe3TXxxAUN7QGRFM97s3KrRi01oX2S3Jd3+dZXxw5PKsCB0RE0hWJhMChgqkhFDhnYC+NjbZA0Wt5KzKfuJwTlwQLdncztnMm6yD63C5jzKsiMuqzI2qWXD5+RKEuQSv6LQ+6MNVTfe+pIhcA4PStR4KE4oDy++bMKp48Od+L6Hez5f8QGEyrVcIc+Eu7YuxbU7t51WMavOQbNJ0n+HwqeE9gpXGxoNsxN/oLWFXYrgZEgythIzo/zh8WAdGa+TQOBHLU5ZJ8hB12p+/aPHiJMI1OAtxf74C4NyVXOHKN5XIZPD9q/DcX4Gx7QxE6tPMWEUJR/q00vxchWZnyj2zOgvulrPbmXHJZySLad1KZbd3mlJBWsBcqC3IlPHB3ezzI4C04van1TAcs2I7ju1mSIhoBVgvVuYflmu7yJ6QN2uMDLelJTEc4nHF2WqQIEIRa930/9szBfIWNOa1JIMgTur172T888Mj5TkLY+0KvihLLLI69BVRBSFcIA/atcXSigxaVffs19ZUUDU+ZL3aRUW0Mpyu2NLvlTGpG7WLR3uHEt8wXd6F4X17lNWRZ2WUSxQGfkm4Ocy0Jys5ddIpAs14yZebTVGfL8Bc7sw=)", "base64EncodedPrivateKeyPropertyName": "sftp.commonSfmcBase64EncodedPrivateKey", "host": "localhost", "port": "2223", "remoteDirectory": "sftp/test" } ] }
Split into as many internal messages as files in the queue message.
Zip files and write zips to S3 in multiple threads. Number of threads is determined by
process.parallel-zip-threads
property. For example, if it set to 8, but there are 15 files, zipping will start immediately for 8 files and 7 other will be picked up by free threads later. Uploading of each file is further paralleled usingprocess.num-upload-threads
,process.queue-capacity
andprocess.part-size-in-mb
properties. If particular zip file exists on S3, then zipping for this brand org code is skipped and considered successful. Zip names are determined by archive name template with placeholders replaced. Zips are written alongside split files. For example, amer_usa/20211001/MJNDEV1/amer_usa1_MJNDEV1_CDP_Extract_20211001.zip.Aggregate all internal messages into a single one again. If there are any errors, send an error email. If there are no errors, send a message to outbound queue (
aws.sqs.outbound-fifo-queue-url
), create a new metadata document in cdpToSfmcFileZipper_processedDeltas collection and send a success email. Example of a message is below.
Support Tips and Notes
File zipper instance is region-independent. It uses region-specific documents from cdpToSfmc_supportedRegions collection though. Properties can be accessed under https://bitbucket.org/rbdigital/spring-boot-cdp-to-sfmc-integration. Production logs can be observed and downloaded from https://kubernetes-dashboard-production.frankfurt.rbdigitalcloud.com under namespace cdp-to-sfmc-integration. For access, the platform team has to be contacted.
Error emails on production are currently sent to cdp.middleware@rb.com mailbox. Try to check it and read what is an error. Errors are usually self-explanatory, describing not zipped files or so.
In order to re-process a message we have to wait for default visibility timeout which is set in Terraform to usually 12 hours. Otherwise, we have to manually create an appropriate inbound message, remove the old message from the inbound queue and submit a new one.
When message processing fails because of an exception, it is returned to the inbound queue. When re-submitting a message, pay attention to deduplication id and message group id. Message group id allows to process messages for the same region in order as they are submitted. It is determined by
aws.sqs.outbound-fifo-message-group-id
property. Deduplication id discards the same messages in the standard SQS timespan of 5 minutes. It is usually created in code based on:It can be whatever when re-submitting manually.
The correctness of the region-specific document in cdpToSfmc_supportedRegions collection is crucial. Archive name template has to be specified correctly using
{
as placeholder prefix and}
as placeholder suffix.This is the only service that considers a list of files for brand org code to be zipped based on presence of a file on S3, not based on metadata in MongoDB. So if the target zip already exists, list of the files for brand org code is considered to be zipped.