Ownership Transfer Flink Job

The UserOwnershipTransferFunction job facilitates the transfer of course batch ownership from one user to another. It achieves this by performing database and search updates to ensure consistent data. The job interacts with several data sources, including Cassandra and Elasticsearch, to locate the relevant information and update it accordingly. Here's what the job does in simpler terms:

  • Identify Ownership Transfers:

    • This job takes an event indicating an ownership transfer between users.

    • It processes the event to determine the source user (fromUserId) and the target user (toUserId).

  • Database Operations:

    • It connects to Cassandra and retrieves information about course batches created by the source user and where they act as a mentor.

    • The job creates Cassandra update queries to change the ownership from the source user to the target user in the appropriate fields.

    • It executes these updates in batches to ensure efficient processing.

  • Search Operations:

    • It interacts with Elasticsearch to find the corresponding course batch documents.

    • The job updates the documents to reflect the new ownership, replacing the source user with the target user in the appropriate fields.

  • Metrics and Logging:

    • The job maintains metrics to track the number of processed events, successful operations, and database updates.

    • It logs detailed information for auditing and error handling.

In summary, this job facilitates the smooth transfer of course ownership, updating relevant data in multiple locations to maintain consistency across the system. It plays a crucial role in ensuring that the information in both the database and search systems accurately reflects the ownership transfer.

Configuration variables:

VariableDefault valuepurpose



Kafka topic from which messages/events are read to be processed



Kafka input topic group Id



Cassandra keyspace name



Cassandra table used to store batch details of a collection. Batch status, start date , end date , batch enrolment end date, enrolment type (open/invite-only), certificate templates etc are stored in this table.



Property used to specify batch size of the database update queries while updating a specific cassandra table in batch format


lms base url


User-Org service URL



API route for fetching user profile details



API route for fetching batch details



Degree of parallelism for the user ownership

Sample event:

    "eid": "BE_JOB_REQUEST",
    "ets": 1712750750956,
    "mid": "LP.1712750750956.07a0a24d-37ef-462c-a614-b76ad2a6a6ac",
    "actor": {
        "type": "System",
        "id": "ownership-transfer"
    "context": {
        "pdata": {
            "ver": "1.0",
            "id": "org.sunbird.platform"
    "object": {
        "type": "user",
        "id": "72d8cd69-2469-4234-82e7-6b849e0a28d9"
    "edata": {
        "organisationId": "01394517023437619214_1111",
        "actionBy": {
            "userId": "ad8c3adf-2447-4559-af15-f6d1057a0b8a",
            "userName": "gtest-user-007"
        "context": "User Deletion",
        "action": "ownership-transfer",
        "fromUserProfile": {
            "userId": "72d8cd69-2469-4234-82e7-6b849e0a28d9",
            "userName": "gtest-user-005",
            "channel": "",
            "organisationId": "",
            "roles": [
        "iteration": 1,
        "assetInformation": {
            "name": "TestContent",
            "identifier": "do_123",
            "primaryCategory": "Practice Question Set",
            "objectType": "QuestionSet"
        "toUserProfile": {
            "userId": "4c009ce1-b069-4d27-879b-605c55ff4ef9",
            "userName": "gtest-user-006",
            "firstName": "G-Test",
            "lastName": "User-006",
            "roles": [

Source code:

Last updated