Glen Mazza's Weblog

« TightBlog 3.0 Status | Main | Activating XML schem... »

https://glenmazza.net/blog/date/20171126 Sunday November 26, 2017

Streaming Salesforce notifications to Kafka topics

Salesforce CRM's Streaming API allows for receiving real-time notifications of changes to records stored in Salesforce. To enable this functionality, the Salesforce developer creates a PushTopic channel backed by a SOQL query that defines the changes the developer wishes to be notified of. Record modifications (Create, Update, Delete, etc.) fitting the SOQL query are sent on the channel and can be picked up by external systems. Salesforce provides instructions on how its Workbench tool can be used to create, view and test PushTopic notifications, which is a useful first step. For Java clients, Salesforce also provides an tutorial using an EMPConnector sample project. At least the username-password version of that sample worked following the instructions given in the tutorial, but the tutorial was vague on how to get the bearer token version to work.

For Kafka, Confluent's Jeremy Custenborder has written a Salesforce source connector for placing notifications from a Salesforce PushTopic to a Kafka topic. His simplified instructions in the GitHub README assume usage of Confluent's wrap of Kafka including the Confluent-only Schema Registry with Avro-formatted messages. I'm expanding on his instructions a bit to make them more end-to-end and also to show how the connector can be used with pure Kafka, no schema registry, and JSON-formatted messages:

  1. Follow the API Streaming Quick Start Using Workbench to configure your SF PushTopic. Before proceeding, make sure records created from the Workbench generate notifications on the SF PushTopic. It's a quick, efficient tutorial.

  2. Create a Connected Application from your force.com account. Connected Apps allow for external access to your Salesforce data. I mostly relied on Calvin Froedge's article for configuring the Connected App.

  3. (Optional) To confirm the Connected App is working properly before moving on to Kafka, you may wish to run the EMPConnector sample mentioned above.

  4. If you haven't already, download a Kafka distribution and expand it, its folder will be referred to as KAFKA_HOME below.

  5. Clone and build the Salesforce source connector in a separate directory.

  6. Open a terminal window with five tabs. Unless stated otherwise, all commands should be run from the KAFKA_HOME directory.

    1. First and second tabs, activate ZooKeeper and the Kafka broker using the commands listed in Step #2 of the Kafka Quick Start.

    2. Third tab, create a Kafka topic to receive the notifications placed on the Salesforce PushTopic:

      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sf_invoice_statement__c
      

      The name of the topic can be different from the one given above, just be sure to update the connector configuration file given in the next step accordingly.

    3. Fourth tab, start the Salesforce Connector. First, navigate to the config folder under the base folder of the connector and make a MySourceConnector.properties file:

      name=connector1
      tasks.max=1
      connector.class=com.github.jcustenborder.kafka.connect.salesforce.SalesforceSourceConnector
      
      # Set these required values
      salesforce.username=your.force.com.username@xxx.com
      salesforce.password=your.force.com.password
      salesforce.password.token=xxxx
      salesforce.consumer.key=xxxx
      salesforce.consumer.secret=xxxx
      salesforce.push.topic.name=InvoiceStatementUpdates
      salesforce.push.topic.create=false
      kafka.topic=sf_${_ObjectType}
      

      Notes:

      • The password token can be obtained via Force.com's Reset My Security Token screen.
      • The consumer key and secret are available from the Connected App configuration within Force.com.
      • Having already created the Salesforce PushTopic, I set salesforce.push.topic.create to false in the configuration above. Alternatively, I could have set it to true and provided the salesforce.object property to have the Salesforce Connector dynamically create the PushTopic. However, the auto-created PushTopic did not (at least for me) do a good job of bringing all the fields of the object ("description" was missing from InvoiceStatement notifications); manually creating the PushTopic will (in most cases) provide the fields given in the SELECT list of the SOQL query you create for the PushTopic.

      Next, from the Connector base folder, create the CLASSPATH and activate the connector as follows:

      export CLASSPATH="$(find target/ -type f -name '*.jar'| tr '\n' ':')"
      $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/MySourceConnector.properties 
      

      Important: Note the export statement above is different than the one in the GitHub instructions, I created a not-yet-applied PR to fix the latter.

    4. For the fifth tab, we need to create a consumer to read the SF messages that the SF connector places on our Kafka topic, this worked for me:

      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sf_invoice_statement__c --from-beginning
      
  7. Use the Workbench Insert page to create a new Invoice Statement record, and view the Kafka consumer output to confirm the Kafka topic received the notification:

    Once created, you should see a message for it output by the Kafka consumer:

    {"schema":
       {"type":"struct","fields":[
          {"type":"string","optional":false,"doc":"Unique identifier for the object.","field":"Id"},
          {"type":"string","optional":true,"field":"OwnerId"},
          {"type":"boolean","optional":true,"field":"IsDeleted"},
          {"type":"string","optional":true,"field":"Name"},
          {"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CreatedDate"},
          {"type":"string","optional":true,"field":"CreatedById"},
          {"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"LastModifiedDate"},
          {"type":"string","optional":true,"field":"LastModifiedById"},
          {"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"SystemModstamp"},
          {"type":"string","optional":true,"field":"Status__c"},
          {"type":"string","optional":true,"field":"Description__c"},
          {"type":"string","optional":true,"field":"_ObjectType"},
          {"type":"string","optional":true,"field":"_EventType"}],
       "optional":false,"name":"com.github.jcustenborder.kafka.connect.salesforce.Invoice_Statement__c"},
       "payload":{"Id":"a001I000004PNEZQA4","OwnerId":null,"IsDeleted":null,"Name":"INV0014","CreatedDate":null,
          "CreatedById":null,"LastModifiedDate":null,"LastModifiedById":null,"SystemModstamp":null, "Status__c":"Negotiating",
          "Description__c":"Hello 11/27","_ObjectType":"Invoice_Statement__c","_EventType":"created"}}
    

Comments

Post a Comment: