Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DataNotification.md 28.74 KiB

Data Notification

Table of Contents

Introduction

The OSDU notification system allows for interested consumers to subscribe to data and metadata changes using a publish/subscriber pattern.

A typical workflow using notification is:

  • Consumer finds a "topic" that they want to keep up to date with any changes in OSDU.
  • Consumer creates Push endpoint, that is used to receive notifications on the interested topic.
  • Consumer creates a Subscription in OSDU Notification System, and proves the ownership of the Push endpoint.
  • Consumer starts to receive notifications for that topic and processes the message to synchronize with the OSDU state.
  • Consumer periodically rotates the "secret" used for subscription.

The topics below describe these steps/APIs in detail that allow consumers to create such integrated workflows using OSDU Notification.

Back to Table of Contents

Steps

Get topics available to subscribe

Consumer uses Data notification "topics" API to view the list of supported notification topics and corresponding sample messages.

GET api/register/v1/topics
curl
curl --request GET \
  --url 'https://register-svc.osdu.com/api/register/v1/topics' \
  --header 'Authorization:  Bearer <JWT>' \
  --header 'Content-Type: application/json' \
  --header 'data-partition-id: common' \

A sample output is shown below. Please note the "name" of the topic. This is required to create a Subscription for a topic you are interested in.

Sample response
[
  {
	"name": "recordstopic",
    "description": "This notification is sent whenever a new record or record version is created, updated or deleted in storage. 'previousVersionKind' is noted upon 'kind' update. Record deletion is noted as a soft 'deletionType'. Record purge is noted as a hard 'deletionType'.",
    "state": "ACTIVE",
    "example": [
      {
        "id": "osdu:abc:123",
        "kind": "osdu:petrel:regularheightfieldsurface:1.0.0",
        "op": "create"
      },
      {
          "id": "osdu:abc:345",
          "kind": "osdu:petrel:regularheightfieldsurface:1.0.1",
          "op": "update",
          "previousVersionKind": "osdu:petrel:regularheightfieldsurface:1.0.0"
      },
      {
          "id": "osdu:abc:567",
          "kind": "osdu:petrel:regularheightfieldsurface:1.0.0",
          "op": "delete",
          "deletionType": "soft"
      },
      {
          "id": "osdu:abc:789",
          "kind": "osdu:petrel:regularheightfieldsurface:1.0.0",
          "op": "delete",
          "deletionType": "hard"
      }
    ]
  },
  {
    "name": "schemachangedtopic",
    "description": "This notification is sent whenever a new schema is created or updated via schema-service.",
    "state": "ACTIVE",
    "example": [
      {
        "kind": "osdu:wks:wellbore:1.0.0",
        "op": "update"
      },
      {
        "kind": "osdu:wks:wellbore:2.0.0",
        "op": "create"
      }
    ]
  },
  {
    "name": "statuschangedtopic",
    "description": "Every Service/Stage would publish their respective status changed information in this topic.",
    "state": "ACTIVE",
    "example": [
      {
        "kind": "status",
        "properties": {
          "correlationId": "12345",
          "recordId": "osdu:file:3479d828-a47d-4e13-a1f8-9791a19e1a7e",
          "recordIdVersion": "1610537924768407",
          "stage": "STORAGE_SYNC",
          "status": "FAILED",
          "message": "acl is not valid",        
          "errorCode ": 400,
          "timestamp ": 1622118996000
        }
      },
      {
        "kind": "dataSetDetails",
        "properties": {
          "correlationId": "12345",
          "dataSetId": "12345",
          "dataSetIdVersion": "1",
          "dataSetType": "FILE",
          "recordCount": 10,
           "timestamp ": 1622118996000
        }
      }
    ]
  }
]

Back to Table of Contents

Subscribing to a topic

The consumer uses the data notification Subscription API to create a Subscription for the topic of interest. A subscription id is returned in the response that can be used to get the subscription details again or delete the subscription later.

POST /api/register/v1/subscription/

To subscribe to a topic, consumers must have a "https" endpoint supporting both "GET" and "POST" methods. "GET" is used as a challenge endpoint when creating (or updating) a subscription to validate that consumer owns this endpoint. "POST" is used for pushing the notifications to consumers.

The challenge is performed only when creating a subscription or when Updating secret for a Subscription

Below are the details of the two types of Subscriptions and the challenge process:

Hash-based Message Authentication Code (HMAC) Subscription using a "secret" string

curl
  curl --request POST \
    --url 'https://register-svc.osdu.com/api/register/v1/subscription \
    -header 'Authorization: Bearer <JWT>' \
    -header 'Content-Type: application/json' \
    -header 'data-partition-id: common' \
    -data '{
  	"name": "testSubscription",
  	"description": "Description",
  	"topic": "records-changed",
  	"pushEndpoint": "<DomainEndpoint>",
  	"secret": {
  		"secretType": "HMAC",
  		"value": "testSecret"
  	}
  }'

Before creating an HMAC Subscription, the consumer needs to make sure that "GET" is supported on the endpoint being registered with OSDU Notification and the endpoint accepts query parameters named "crc" & "hmac". OSDU Notification will send a "GET" request on this endpoint with a random crc, and expects a response hash generated using the crc & the secret value (i.e. "testSecret" in the example above).

In addition, consumers may also want to validate the hmac field, which is the signature that will be used when a message is pushed to this endpoint. The signature verification must be used in the push endpoint implementation before processing the messages, to ensure that the message is coming from OSDU Notification.

Note: Secret value may not contain any special characters (only alphanumeric characters) and the number of characters must be even.

Sample API definition for setting up the challange end point
...
paths:
   ...
   ...
  '/consumer':
    get:
      summary: Notification "get" API
      consumes:
        - application/json
      produces:
        - application/json
      parameters:
        - in: query
          name: crc
          type: string
          required: true
        - in: query
          name: hmac
          type: string
          required: true
      responses:
        '200':
          description: OK
          schema:
            $ref: '#/definitions/challengeResponse'

    ...
    ...

definitions:
  challengeResponse:
    type: object
    required:
      - schema
    properties:
      responseHash:
        type: string    
Sample Java code to generate the hmac signature, validate it and send a response with hash

...
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
....

@GET
@Path(<DomainEndpoint>)
public Response challenge(@QueryParam("crc") @NotBlank String crc, 
                          @QueryParam("hmac") @NotBlank String hmac) {
    // Use the secret you send to the subscriber registration create request
    // Hint: Secret string can be stored as configuration for the service    
    String secret = <getSecretString()>
    verifyHmacSignature(hmac, secret);
    
    String response = getResponseHash( secret + crc);
    return Response.status(HttpStatus.SC_OK).entity(Collections.singletonMap("responseHash", response)).build();
}

private String getResponseHash(String input) {
    String response = Hashing.sha256()
            .hashString(input, StandardCharsets.UTF_8)
            .toString();
    return Base64.getEncoder().encodeToString(response.getBytes());
}

private static final String HMAC_SHA_256 = "HmacSHA256";
private static final String DATA_FORMAT = "{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}";
private static final String HMAC_SHA_256 = "HmacSHA256";
private static final String DATA_FORMAT = "{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}";
private static final String NOTIFICATION_SERVICE = "de-notification-service";
private static final long EXPIRE_DURATION = 30000L;

private void verifyHmacSignature(String hmac, String secret) throws Exception {
    if (Strings.isNullOrEmpty(hmac)) {
        throw new Exception(MISSING_HMAC_SIGNATURE);
    }
    if (Strings.isNullOrEmpty(secret)) {
        throw new Exception(MISSING_SECRET_VALUE);
    }
    String[] tokens = hmac.split("\\.");
    if (tokens.length != 2) {
        throw new Exception(INVALID_SIGNATURE);
    }
    byte[] dataBytes = Base64.getDecoder().decode(tokens[0]);
    String requestSignature = tokens[1];

    String data = new String(dataBytes, StandardCharsets.UTF_8);
    HmacData hmacData = new Gson().fromJson(data, HmacData.class);
    String url = hmacData.getEndpointUrl();
    String nonce = hmacData.getNonce();
    String expireTime = hmacData.getExpireMillisecond();
    if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(nonce) || Strings.isNullOrEmpty(expireTime)) {
        throw new Exception(MISSING_ATTRIBUTES_IN_SIGNATURE);
    }
    String newSignature = getSignedSignature(url, secret, expireTime, nonce);
    if (!requestSignature.equalsIgnoreCase(newSignature)) {
        throw new Exception(INVALID_SIGNATURE);
    }
}

private String getSignedSignature(String url, String secret, String expireTime, String nonce) throws Exception {
    if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(secret) || !StringUtils.isNumeric(expireTime)) {
        throw new Exception(ERROR_GENERATING_SIGNATURE);
    }
    final long expiry = Long.parseLong(expireTime);
    if (System.currentTimeMillis() > expiry) {
        throw new Exception(SIGNATURE_EXPIRED);
    }
    String timeStamp = String.valueOf(expiry - EXPIRE_DURATION);
    String data = String.format(DATA_FORMAT, expireTime, url, nonce);
    try {
        final byte[] signature = getSignature(secret, nonce, timeStamp, data);
        return DatatypeConverter.printHexBinary(signature).toLowerCase();
    } catch (Exception ex) {
        throw new Exception(ERROR_GENERATING_SIGNATURE, ex);
    }
}

private byte[] getSignature(String secret, String nonce, String timeStamp, String data) throws Exception {
    final byte[] secretBytes = DatatypeConverter.parseHexBinary(secret);
    final byte[] nonceBytes = DatatypeConverter.parseHexBinary(nonce);
    final byte[] encryptedNonce = computeHmacSha256(nonceBytes, secretBytes);
    final byte[] encryptedTimestamp = computeHmacSha256(timeStamp, encryptedNonce);
    final byte[] signedKey = computeHmacSha256(NOTIFICATION_SERVICE, encryptedTimestamp);
    final byte[] signature = computeHmacSha256(data, signedKey);
    return signature;
}

private byte[] computeHmacSha256(final String data, final byte[] key) throws Exception {
    final Mac mac = Mac.getInstance(HMAC_SHA_256);
    mac.init(new SecretKeySpec(key, HMAC_SHA_256));
    return mac.doFinal(data.getBytes(StandardCharsets.UTF_8));
}

private byte[] computeHmacSha256(final byte[] data, final byte[] key) throws Exception {
    final Mac mac = Mac.getInstance(HMAC_SHA_256);
    mac.init(new SecretKeySpec(key, HMAC_SHA_256));
    return mac.doFinal(data);
}