Pub/Sub without CloudEvents

Dapr uses CloudEvents to provide additional context to the event payload, enabling features like:

  • Tracing
  • Deduplication by message Id
  • Content-type for proper deserialization of event’s data

For more information about CloudEvents, read the .

When adding Dapr to your application, some services may still need to communicate via raw pub/sub messages not encapsulated in CloudEvents. This may be for compatibility reasons, or because some apps are not using Dapr. Dapr enables apps to publish and subscribe to raw events that are not wrapped in a CloudEvent.

Warning

Dapr apps are able to publish raw events to pub/sub topics without CloudEvent encapsulation, for compatibility with non-Dapr apps.

To disable CloudEvent wrapping, set the metadata to true as part of the publishing request. This allows subscribers to receive these messages without having to parse the CloudEvent schema.

  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. req_data = {
  4. 'order-number': '345'
  5. }
  6. # Create a typed message with content type and body
  7. resp = d.publish_event(
  8. pubsub_name='pubsub',
  9. topic='TOPIC_A',
  10. metadata=(
  11. ('rawPayload', 'true')
  12. )
  13. )
  14. # Print the request
  15. print(req_data, flush=True)

Diagram showing how to subscribe with Dapr when publisher does not use Dapr or CloudEvent

When subscribing programmatically, add the additional metadata entry for so the Dapr sidecar automatically wraps the payloads into a CloudEvent that is compatible with current Dapr SDKs.

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dapr/subscribe', methods=['GET'])
  9. def subscribe():
  10. subscriptions = [{'pubsubname': 'pubsub',
  11. 'topic': 'deathStarStatus',
  12. 'route': 'dsstatus',
  13. 'metadata': {
  14. } }]
  15. return jsonify(subscriptions)
  16. @app.route('/dsstatus', methods=['POST'])
  17. def ds_subscriber():
  18. print(request.json, flush=True)
  19. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  20. app.run()

Similarly, you can subscribe to raw events declaratively by adding the rawPayload metadata entry to your Subscription Custom Resource Definition (CRD):

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Subscription
  3. metadata:
  4. name: myevent-subscription
  5. spec:
  6. topic: deathStarStatus
  7. route: /dsstatus
  8. pubsubname: pubsub
  9. metadata:
  10. rawPayload: "true"
  11. scopes:
  12. - app2