NUTRINTG (4) File Processor (DEPRECATED/OUTDATED DOCUMENTATION)
File Processor is a service to split and modify already decrypted files. Here is the high-level diagram:
Process Flow
A message is polled from (
aws.sqs.inbound-fifo-queue-url
) every (process.fixed-delay
) milliseconds.If there is a message, it is checked if the delta in it is the next one after the last processed delta stored in MongoDB metadata. If deltas are in an unexpected order, an error email is sent and the process for this message is terminated. It means that a message will return to an inbound FIFO queue after a specified visibility timeout on a queue itself.
If delta is next expected, the information is fetched from MongoDB to check what files have been already processed for this delta (possibly during previous failed runs for this delta).
A message is split into one message per file.
Split messages are processed in an executor channel in (
process.processing-thread-pool-size
) threads. The correctness of headers is checked for every message and files are split by configured separator field. The delimiter is changed to a comma and all fields are wrapped into double-quotes. If there was the information in MongoDB that the file has been already processed, then the processing is skipped. Processed files are stored in the same catalog on S3, but in the sub catalog named after separator field value (usually, Brand Org Code).When processing of all messages is finished, aggregation is invoked. If there were any errors, an error email is sent and the message returns to a queue after configured visibility timeout. If there were no errors, an outbound message is built and sent to (
aws.sqs.outbound-fifo-queue-url
) queue, the inbound message is removed from (aws.sqs.inbound-fifo-queue-url
) queue, information about processed metadata is stored in MongoDB, and a success email is sent.
If an unexpected exception occurs somewhere in a flow, the message is passed to an error channel, which triggers an error email.
Inbound Message Example
{
"region": "amer",
"deltaDate": "20200314",
"filePaths": [
"amer/20200314/rb_amer_customer_20200314120000.dat",
"amer/20200314/rb_amer_order_20200314120000.dat"
]
}
Outbound Message Example
{
"region": "amer",
"deltaDate": "20200314",
"fileSets": [
{
"brandOrgCode": "TEST1",
"filePaths": [
"amer/20200314/TEST1/rb_amer_customer_20200314120000.dat",
"amer/20200314/TEST1/rb_amer_order_20200314120000.dat"
]
},
{
"brandOrgCode": "TEST2",
"filePaths": [
"amer/20200314/TEST2/rb_amer_customer_20200314120000.dat",
"amer/20200314/TEST2/rb_amer_order_20200314120000.dat"
]
}
]
}
Main Properties to Consider
Property | Description | Example Value |
---|---|---|
| How often to poll a message from an inbound queue (in milliseconds) | 10000 |
| Date format for delta data in inbound and outbound messages. | yyyyMMdd |
| How often new deltas are supposed to come in days. | 1 |
| In how many threads to process split messages. | 2 |
| The pattern for file names to parse data from it (without extension). | rb_._(?<entity>.)_[0-9]{14} |
| Name of a group from the pattern above. | entity |
| Separator in inbound files. | | |
| Separator in outbound files. | , |
| List of headers for a particular entity in a particular region. Must be specified appropriately for each supported region and entity. | NAME,LAST_NAME,BRAND_ORG_CODE |
| Position of a field by which we should separate a file with particular entity in a particular region, usually Brand Org Code (count starts from 0). Must be specified appropriately for each supported region and entity. | 2 |
| Inbound SQS first-in-first-out queue URL. |
|
| Outbound SQS first-in-first-out queue URL. |
|