/
NUTRINTG Instant Message Service (IMS)

NUTRINTG Instant Message Service (IMS)

Overall Concept

The intention of this service is sending an activity (etc. user profile creation, update or order placement) to SFMC Journey right after are successfully created in the target system.

Currently, the integration with SFMC can last more than 24 hours, which is not satisfactory, since some user activities require immediate action, i.e: welcome mail.

General rules

  • The service should log some steps of the process. The administrator looking into logs should be able to find out what was the path of processing the message, and in case of error should easily find out the root cause

  • Every log should be followed by ‘[${correlationId}] ’

  • If an unexpected error happens, the message should be retried after some defined period of time. The number of retries should be configurable. The ability of AnypointMQ can be used. The maximum delay on AnypointMQ is 15 minutes which is acceptable (the delay should be applied only for messages which are processed 2nd or more times). Any other initiative or mechanism to retry messages is welcomed.
    Unexpected errors are: the external service is unreachable; the external service returns 5xx error;
    If 4xx is returned, then the message should not be retried, since the message will not be fixed automatically

  • All sensitive data should be encrypted in DB (example in cdp-sfmc-journey-service project - clientId and clientSecret are encrypted)

Flowchart

Flowchart description

Read Incoming Message - The IMS service listens to the IMS queue. Once there are messages to process, the service consumes next message. It can be done parallel by many threads since the messages are not correlated to each other. Ideally, the messages are processed in FIFO order.
A separate client should be created for this service. The queue should be created in Anypoint Cloud Hub.
An initial message has the following format:

{ correlationId: String, traceId: String, accountSource: String, sourceAccountNumber: String, data: JSON Object, transientData: JSON Object }

correlationId - is an optional parameter. The value is attached to every log.
traceId - is an optional parameter. The value is used to create ‘trace events’.
accountSource - mandatory parameter. It controls the data flow.
sourceAccountNumber - mandatory parameter.
data - the content depends on an entity type
transientData - some additional data which are not supposed to be transformed. It must contain ‘email’ field used to retrieve user profile
The initial message can contain other fields, which are not processed by this service

Retrieve and store CorrelationId and TraceId - it is done by external libraries. The example is available in cdp-profile-adapter service.
If no correlationId is found, the library must generate one.
If no traceId is found, then no trace events should be generated.

Create ‘Start Processing’ - only if eventId is present To do this, the method of the existing library must be executed with proper parameters.
The example of usage can be found in ‘cdp-profile-adapter’ project.

Read MongoDB configuration - For every message, there should be one and only one record found in the configuration, based on accountSource value.
The collection should be named 'middlewareInstantMessageService'.
The collection should have the unique key consist of accountSource
The data structure is the following:

{ _id: ObjectId, accountSource: String, entityType: [PROFILE|ORDER], isCDPTarget: Boolean, APIEvent: String, dataExtensionKey: String }

_id - autogenerate by MongoDB
accountSource - is the unique key, used for retrieving CDP Profile
entityType - one of PROFILE or ORDER (can be extended in the future)
isCDPTarget - indicates if the entity is correlated with CDP system
APIEvent - the value is a part of an outgoing message
dataExtensionKey - the key used to send order data to another service via HTTP (defined only for Order entityType

Found matching record in DB? - If no record is found then execute subroutine 'Process abruption', which terminates the process.

Should message be processed? - Check if the flow should be continued. The message must be processed further when:

  1. The message data entity type is PROFILE and ‘incomingData.operation’ value equals ‘CREATE’

  2. The message data entity type is ORDER and ‘incominData.data.financial_status' value equals ‘paid’

If none of the above criteriums are met, then the message should be dropped with the proper logs

Is CDP Target? - Check if the configuration field ‘isCDPTarget' is 'true’.

Message contains email address? - Check if the incoming message contains 'transientData.email'. If no then execute subroutine 'Drop message', which terminates the process.

Set EmailAddress as ContactKey - Set 'transientData.email' from the incoming message as the ProfileId

Retrieve CDP Profile - HTTP request to search service should be sent

The example request is

http://${MULE_INETRNAL_MIDDLEWARE_PROFILE_SEARCH_SERVICE_ADDRESS}/search/v2/profile?emailAddress=${EMAIL_ADDRESS}&accountSource=${ACCOUNT_SOURCE}

where:

MULE_INETRNAL_MIDDLEWARE_PROFILE_SEARCH_SERVICE_ADDRESS - is an internal mule address of the search service. The services will share the same VPC.

EMAIL_ADDRESS - it is the value of field 'transientData.email' from the incoming message

ACCOUNT_SOURCE - it is the value of field ‘transientData.customerAccountSource’ from the incoming message if not empty, otherwise 'accountSource' from the incoming message

 

Is Profile Present? - check if the response from the Search Service is positive and the response payload contains the profile (the returned array is not empty)

Set IndividualId as ContactKey and PrefLanguageCode - Set ‘IndividualId' from the Profile Data (response from Search Service) as the ContactKey> Additionally store ‘Language’ code and 'IndividualId’ from the Profile Data for further usage

Is Profile Entity? - check if the configuration field entityType equals PROFILE

Return message to the IMS queue - returns the original message to the queue (NACK operation). The message should be available to be consumed again after some, defined period of time. The proper mechanism should be implemented to avoid falling into an infinite loop

Check Entity Type - split the dataflow into one possible path based on the configuration field ‘entity’

Send Order to SFMC - subroutine for sending requests to external service SFMC

Transform Order - transforms the ‘data’ object from the incoming message into the Order Message which is supposed to be sent to the outgoing queue.

The incoming Order Data structure (incomingMessage.data)

 

Order mapping:

 

Transform Profile - transforms the ‘data’ object from the incoming message into the Profile Message which is supposed to be sent to the outgoing queue

The schema of incoming Profile data structure (incomingMessage.data) can be found here https://rb-digital.atlassian.net/wiki/download/attachments/472678437/profile-payload-service-schema_v06.json?version=1&modificationDate=1578921742888&cacheVersion=1&api=v2

Profile mapping

 

Prepare Message - prepare a message to be sent to the Journey Service via messaging queue. The message queue url and ID can be found in Anypoint Cloud Hub. The same client should be used as for reading the message from IMS queue.

Message transformation

Create ‘Process Completed’ Event - only if eventId is present To do this, the method of the existing library must be executed with proper parameters.
The example of usage can be found in ‘cdp-profile-adapter’ project.

Subroutines

Dropping Message - this is a graceful process termination. The message is not processed further, nor the exception is thrown. However, some info should be logged that this message will be not processed with the proper reason.

Process abruption - this is a non-graceful process termination. It indicates that something went wrong. The message should not be processed further and the proper exception should be thrown and logged.

Send Order data to SFMC - Order data should be transformed and send via HTTP Post request to the proper DataExtension in SFMC. Then, regardless of the response, the flow should continue. Ideally, the flow should continue when the response is 400 or it is the last try, otherwise, the message should be returned to the queue and consumed again after some time

Order mapping

Request to the service wich communicate with SFMC

Method: POST

Host: internal worker of journey service

Enpoint Path: dataExtension/${DATA_EXTENSION_KEY}/rows

Headers:

  • correlationId (if present)

  • traceId (if present)

Body (application/json type)

Dictionary

IMS - Instant Message Service. This is the service to be implemented. Another name is Brilliant Basic.

MD - Middleware. The entire system. Consists of many services in order to satisfy business requirements. IMS is one of the components of the system.

Profile Data - the JSON Object returned in a JSON Array from the HTTP response from middleware-profile-search-service

SFMC - Salesforce Marketing Cloud

DataExtension - structure in SFMC which keeps data.