Activity Aggregate Updater

'activity-aggregate-updater' job updates the Course progress of the enrolled users and upon completion of the course, pushes an event to certificate pre-processor job to validate and issue certificate.

VariableDefault ValuePurpose

kafka.input.topic

{{env}}.coursebatch.job.request

Kafka topic from which messages/events are read to be processed.

kafka.output.failed.topic

{{env}}.activity.agg.failed

Kafka topic to which message is written when an exception occurs while processing an event.

kafka.audit.topic

{{env}}.telemetry.raw

Kakfa topic to which and audit message is written to.

kafka.certissue.topic

{{env}}.issue.certificate.request

Kafka topic used to trigger certificate issue pre-processor job

kafka.groupId

{{env}}-activity-aggregate-updater-group

Kafka input topic group Id

lms-cassandra.keyspace

sunbird_courses

Cassandra keyspace name

lms-cassandra.content_consumption.table

user_content_consumption

Cassandra table used to store content wise data for a collection of a batch by a user. Content progress, status etc

lms-cassandra.user_enrolments.table

user_enrolments

Cassandra table used to store user enrolment data in a collection of a particular batch. This also holds the consumption progress, enrolment status and issued certificate details

lms-cassandra.user_activity_agg.table

user_activity_agg

Cassandra table used to store user consumption aggregate details of a collection in a batch. Aggregates like the consumption completed content count

redis.database.relationCache.id

10

Redis index from which computed data like leafnodes and optionalnodes is read.

dedup-redis.host

IP of deduplicate Redis host

De-duplication Redis is used to remove duplicate events possibly part of the events list in kafka input topic which is fetched in batch size of 'threshold.batch.read.size' parameter mentioned below.

dedup-redis.port

port

port of deduplicate Redis

dedup-redis.database.index

13

De-duplication Redis index

dedup-redis.database.expiry

604800

De-duplication Redis Expiry time

threshold.batch.read.interval

NOT USED

threshold.batch.read.size

1000

Flink stream window size

threshold.batch.write.size

10

Property used to specify batch size of the database update queries while updating a specific cassandra table in batch format

activity.module.aggs.enabled

true

Used to configure if the consumption aggregation calculation is to be enabled on course leaf nodes

activity.input.dedup.enabled

true

Used to configure if the aggregation job is to run in De-duplication mode

activity.filter.processed.enrolments

true

Used to configure if the activity aggregation process is to be skipped for user enrolments with status 2 (completed courses)

activity.collection.status.cache.expiry

3600 (in seconds)

Expiry time of TTL cache set to read or store latest collection 'status' information. If latest TTL cache doesnt have collection 'status' information, then the same is read from Search service configured below and TTL cache will be updated.

service.search.basePath

IP of the search service

IP of the search service

Sample event:

{
  "eid": "BE_JOB_REQUEST",
  "ets": 1683617553712,
  "mid": "LP.1683617553712.b5899054-aabf-4a4c-b4e1-749f8721a276",
  "actor": {
    "type": "System",
    "id": "Course Batch Updater"
  },
  "context": {
    "pdata": {
      "ver": "1.0",
      "id": "org.sunbird.platform"
    }
  },
  "object": {
    "type": "CourseBatchEnrolment",
    "id": "01379148663062528012_9c559542-9bb0-40bd-92e2-3230aebcb3a5"
  },
  "edata": {
    "contents": [
      {
        "contentId": "do_21310354322092851214725",
        "status": 2
      }
    ],
    "action": "batch-enrolment-update",
    "iteration": 1,
    "batchId": "01379148663062528012",
    "userId": "9c559542-9bb0-40bd-92e2-3230aebcb3a5",
    "courseId": "do_2137914724998758401207"
  }
}

Last updated