diff --git a/provider/indexer-gc/docs/anthos/README.md b/provider/indexer-gc/docs/anthos/README.md index e86460c693608d9bb6eb3c7a03bc0503807919e7..70276024394e1452e3380c89cefc0e2603ea66df 100644 --- a/provider/indexer-gc/docs/anthos/README.md +++ b/provider/indexer-gc/docs/anthos/README.md @@ -40,6 +40,8 @@ Usage of spring profiles is preferred. | `PARTITION_AUTH_ENABLED` | ex `true` or `false` | Disable or enable auth token provisioning for requests to Partition service | no | - | | `OQMDRIVER` | `rabbitmq` or `pubsub` | Oqm driver mode that defines which message broker will be used | no | - | | `SERVICE_TOKEN_PROVIDER` | `GCP` or `OPENID` | Service account token provider, `GCP` means use Google service account `OPEIND` means use OpenId provider like `Keycloak` | no | - | +| `RABBITMQ_RETRY_DELAY` | ex `20000` | Message retry interval after unsuccessful processing | no | - | +| `RABBITMQ_RETRY_LIMIT` | ex `5` | Number of retries to send a message after unsuccessful processing | no | - | ### Properties set in Partition service: @@ -190,15 +192,15 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H RabbitMq should have exchanges and queues with names and configs: -| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | -|------------------------------|-------------------------------------|--------------------------------------|--------------------------------------------------------------------------------------------------------------------------------| -| indexing-progress | `Type fanout` <br/>`durable: true` | (Consumer not implemented) | (Consumer not implemented) | -| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: records-changed-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| records-changed-dead-letter | `Type fanout` <br/>`durable: true` | indexer-records-changed-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` | -| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: reprocess-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| reprocess-dead-letter | `Type fanout` <br/>`durable: true` | indexer-reprocess-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` | -| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: schema-changed-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| schema-changed-dead-letter | `Type fanout` <br/>`durable: true` | indexer-schema-changed-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` | +| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | +|----------------------------------|-----------------------------------------------------------------------------|---------------------------|----------------------------------------------------------------------| +| indexing-progress | `Type fanout` <br/>`durable: true` | (Consumer not implemented) | (Consumer not implemented) | +| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| indexer-records-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| indexer-reprocess-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| indexer-schema-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | ## Keycloak configuration diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java index 3d19f8e6fb68bc5a22a1ffa80a6ea306da1714f0..973372f431e5d29c333affcd43932736132dbe16 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java @@ -60,8 +60,7 @@ public abstract class IndexerOqmMessageReceiver implements OqmMessageReceiver { int statusCode = appException.getError().getCode(); if (statusCode > 199 && statusCode < 300 && statusCode != RequestStatus.INVALID_RECORD) { log.info( - "Event : {}, was not processed, and will NOT be rescheduled.", - oqmMessage, + "Event id : " + oqmMessage.getId() + ", was not processed, and will NOT be rescheduled.", appException ); acked = true; @@ -69,16 +68,15 @@ public abstract class IndexerOqmMessageReceiver implements OqmMessageReceiver { //It is possible to get both AppException with wrapped in original Exception or the original Exception without any wrapper Exception exception = Optional.ofNullable(appException.getOriginalException()).orElse(appException); log.warn( - "Event : {}, was not processed, and will BE rescheduled.", - oqmMessage, + "Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", exception ); } } catch (Exception exception) { log.error( - "Error, Event : {}, was not processed, and will BE rescheduled.", - oqmMessage, - exception); + "Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", + exception + ); } finally { if (!acked) { oqmAckReplier.nack(); diff --git a/provider/indexer-gc/src/main/resources/application-anthos.properties b/provider/indexer-gc/src/main/resources/application-anthos.properties index e33d2787bfea0e4f92edfd0276745c590fcc4078..491a6ee820619c5ef157905ab94779d8580a14e3 100644 --- a/provider/indexer-gc/src/main/resources/application-anthos.properties +++ b/provider/indexer-gc/src/main/resources/application-anthos.properties @@ -4,4 +4,5 @@ partition-auth-enabled=false openid.provider-url= openid.provider-client-id= openid.provider-client-secret= -rabbitmq.retry.limit=3 +rabbitmq-retry-delay=20000 +rabbitmq-retry-limit=5