Marathon documentation

Contents:

Overview

What is Marathon? Marathon is a push notification platform that makes it very easy to send massive push notifications to tens of millions of users for several different apps.

Features

  • Multi-tenant - Marathon already works for as many apps as you need, just keep adding new ones;
  • Multi-services - Marathon supports both gcm and apns services, but plugging a new one shouldn’t be difficult;
  • Massive Push Notification - Send tens of millions of push notifications and keep track of job status;
  • New Relic Support - Natively support new relic with segments in each API route for easy detection of bottlenecks;
  • Sendgrid Support - Natively support sendgrid and send emails when jobs are created, scheduled, paused or enter circuit break;
  • Easy to deploy - Marathon comes with containers already exported to docker hub for every single of our successful builds. Just pick your choice!

Architecture

Marathon is based on some premises:

  • You have a system that reads kafka topics and send pushs to apns and gcms services;
  • You have a PostgreSQL Database with user tables containing at least the following information:
    • user_id: the identification of an user, it can be repeated for different tokens (e.g. an user with several devices);
    • token: the device token registered in apns or gcm service;
    • locale: the language of the device (ex: en, fr, pt)
    • region: the region of the device (ex: US, FR, BR)
    • tz: the timezone of the device (ex: -0400, -0300, +0100)
  • The apps registered in the Marathon api already have created user tables (in the previous PostgreSQL Database) and Kafka topics for apns and gcm services;

Marathon is composed of three main modules:

  • An API responsible for CRUD of apps, templates and jobs;
  • A worker composed of several sub-workers:
    • A sub-worker responsible for getting the information from the database, building the message and sending to Kafka.
    • A sub-worker responsible for splitting a CSV and creating small devices ID batches.
    • A sub-worker responsible for scheduling push notifiction for users in a CSV file using each user timezone;
    • A sub-worker responsible for building a message given a template and a context and sending it to the app and service corresponding kafka topic;
  • A feedback listener responsible for processing feedbacks of the notifications sent to apns or gcm;

Marathon and Pusher

Marathon can be used with Pusher. In the following image, you can see a full example of the integration of marathon and pusher. Note that this example doesn’t include any routes to send individual messages or to register device tokens (this information must be present the push database, as said in the premises section).

_images/PushSystem.pngexample

The Stack

For the devs out there, our code is in Go, but more specifically:

  • Web Framework - Echo;
  • Database - Postgres >= 9.5;
  • Workers - go-workers using Redis.
  • Kafka and Zookeper;

Who’s Using it

Well, right now, only us at TFG Co, are using it, but it would be great to get a community around the project. Hope to hear from you guys soon!

How To Contribute?

Just the usual: Fork, Hack, Pull Request. Rinse and Repeat. Also don’t forget to include tests and docs (we are very fond of both).

Hosting Marathon

There are two ways to host Marathon: docker or from source.

Docker

Running Marathon with docker is rather simple. Our docker container image comes bundled with the API binary. All you need to do is load balance all the containers and you’re good to go. The API runs at port 8080 in the docker image.

Marathon uses PostgreSQL to store jobs information. The container takes environment variables to specify this connection:

  • MARATHON_DB_HOST - PostgreSQL host to connect to;
  • MARATHON_DB_PORT - PostgreSQL port to connect to;
  • MARATHON_DB_DATABASE - PostgreSQL database to connect to;
  • MARATHON_DB_USER - Password of the PostgreSQL Server to connect to;

Marathon also uses another PostgreSQL database to read tokens information. The container takes environment variables to specify this connection:

  • MARATHON_PUSH_DB_HOST - PostgreSQL host to connect to;
  • MARATHON_PUSH_DB_PORT - PostgreSQL port to connect to;
  • MARATHON_PUSH_DB_DATABASE - PostgreSQL database to connect to;
  • MARATHON_PUSH_DB_USER - Password of the PostgreSQL Server to connect to;

For uploading and reading CSV files Marathon uses AWS S3, so you’ll need to specify the following environment variables as well:

  • MARATHON_S3_BUCKET - AWS S3 bucket containing the csv files;
  • MARATHON_S3_FOLDER - AWS S3 folder containing the csv files;
  • MARATHON_S3_ACCESSKEY - AWS S3 access key;
  • MARATHON_S3_SECRETACCESSKEY - AWS S3 secret;
  • MARATHON_S3_CONTROLGROUPFOLDER - AWS S3 folder containing the control group csv files;

The workers use redis for queueing:

  • MARATHON_WORKERS_REDIS_HOST - Redis host to connect to;
  • MARATHON_WORKERS_REDIS_PORT - Redis port to connect to;
  • MARATHON_WORKERS_REDIS_PASS - Password of the redis server to connect to;

Marathon uses kafka to send push notifications:

  • MARATHON_KAFKA_BOOTSTRAPSERVERS - Kafka servers to connect to (comma separated, without spaces);

The workers need a template for sending push notifications:

  • MARATHON_WORKERS_TOPICTEMPLATE - Kafka topic template;

Finally, the feedback listener uses kafka for receiving the push notifications’ feedbacks from APNS or GCM:

  • MARATHON_FEEDBACKLISTENER_KAFKA_BROKERS - Kafka brokers to connect to (comma separated, without spaces);
  • MARATHON_FEEDBACKLISTENER_KAFKA_TOPICS - Array of kafka topics to read from;
  • MARATHON_FEEDBACKLISTENER_KAFKA_GROUP - Kafka consumer group;
  • MARATHON_FEEDBACKLISTENER_FLUSHINTERVAL - Interval during which the feedback listener caches the feedbacks metrics before updating the job feedbacks in PostgreSQL;

Other than that, there are a couple more configurations you can pass using environment variables:

  • MARATHON_NEWRELIC_KEY - If you have a New Relic account, you can use this variable to specify your API Key to populate data with New Relic API;
  • MARATHON_SENTRY_URL - If you have a sentry server you can use this variable to specify your project’s URL to send errors to.
  • MARATHON_SENDGRID_KEY - If you have a sendgrid account, you can use this variable to specify your API Key for sending emails when jobs are created, scheduled, paused or enter circuit break;

Example command for running with Docker

    $ docker pull tfgco/marathon
    $ docker run -t --rm -e "MARATHON_POSTGRES_HOST=<postgres host>" -e "MARATHON_POSTGRES_PORT=<postgres port>" -p 8080:8080 tfgco/marathon

Source

Left as an exercise to the reader.

Marathon API

Every request other than GET /healthcheck must pass a x-forwarded-email header, otherwise it will return 401 Unauthorized.

Healthcheck Routes

Healthcheck

GET /healthcheck

Validates that the app is still up, including the database connection.

  • Success Response

    • Code: 200

    • Content:

        {
          "healthy": true
        }
      
    • Headers:

      It will add an MARATHON-VERSION header with the current marathon module version.

  • Error Response

    It will return an error if it failed to connect to the database.

    • Code: 500
    • Content:
      {
        "healthy": false
      }
    

App Routes

List Apps

GET /apps

List all apps in Marathon DB.

  • Success Response

    • Code: 200

    • Content:

      [
        {  
          id:        [uuid],
          name:      [string],
          bundleId:  [string],
          createdBy: [string], // email
          createdAt: [int64],  // nanoseconds since epoch
          updatedAt: [int64]   // nanoseconds since epoch
        },
        {  
          id:        [uuid],
          name:      [string],
          bundleId:  [string],
          createdBy: [string], // email
          createdAt: [int64],  // nanoseconds since epoch
          updatedAt: [int64]   // nanoseconds since epoch
        },
        ...
      ]
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Create App

POST /apps

Creates a new app with the given parameters.

  • Payload

    {
      "name":                          [string],  // 255 characters max
      "bundleId":                      [string]   // matching ^[a-z0-9]+\\.[a-z0-9]+(\\.[a-z0-9]+)+$
    }
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:        [uuid],   // generated by marathon
        name:      [string],
        bundleId:  [string],
        createdBy: [string], // email of the authenticated user
        createdAt: [int64],  // nanoseconds since epoch
        updatedAt: [int64]   // nanoseconds since epoch
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there is already an app with the same bundleId.

    • Code: 409

    • Content:

      {
        "reason": [string]
      }
      

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Retrieve App

GET /apps/:appId

Retrieves the app that has id appId.

  • Success Response

    • Code: 200

    • Content:

      {  
        id:        [uuid],
        name:      [string],
        bundleId:  [string],
        createdBy: [string], // email
        createdAt: [int64],  // nanoseconds since epoch
        updatedAt: [int64]   // nanoseconds since epoch
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Update App

PUT /apps/:appId

Updates the app that has id appId.

  • Payload

    {
      "name":                          [string],  // 255 characters max
      "bundleId":                      [string]   // matching ^[a-z0-9]+\\.[a-z0-9]+(\\.[a-z0-9]+)+$
    }
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:        [uuid],   // generated by marathon
        name:      [string],
        bundleId:  [string],
        createdBy: [string], // email of the authenticated user
        createdAt: [int64],  // nanoseconds since epoch
        updatedAt: [int64]   // nanoseconds since epoch
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there is already an app with the same bundleId.

    • Code: 409

    • Content:

      {
        "reason": [string]
      }
      

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Delete App

DELETE /apps/:appId

Deletes the app that has id appId.

  • Success Response

    • Code: 204
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Template Routes

List app templates

GET /apps/:appId/templates

List all templates for the app with the given id.

  • Success Response

    • Code: 200

    • Content:

      [
        {  
          id:        [uuid],
          name:      [string],
          locale:    [string],
          defaults:  [json],
          body:      [json],
          appId:     [uuid],
          createdBy: [string], // email
          createdAt: [int64],  // nanoseconds since epoch
          updatedAt: [int64]   // nanoseconds since epoch
        },
        {  
          id:        [uuid],
          name:      [string],
          locale:    [string],
          defaults:  [json],
          body:      [json],
          appId:     [uuid],
          createdBy: [string], // email
          createdAt: [int64],  // nanoseconds since epoch
          updatedAt: [int64]   // nanoseconds since epoch
        },
        ...
      ]
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Create Template

POST /apps/:appId/templates

Creates a new template with the given parameters.

  • Payload

    {
      name:      [string],
      locale:    [string],
      defaults:  [json],   // cannot be empty
      body:      [json]   // cannot be empty
    }
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:        [uuid],
        name:      [string],
        locale:    [string],
        defaults:  [json],   // cannot be empty
        body:      [json],   // cannot be empty
        appId:     [uuid],
        createdBy: [string], // email
        createdAt: [int64],  // nanoseconds since epoch
        updatedAt: [int64]   // nanoseconds since epoch
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there is already a template with the same appId, name and locale.

    • Code: 409

    • Content:

      {
        "reason": [string]
      }
      

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Retrieve Template

GET /apps/:appId/templates/:templateId

Retrieves the app that has id templateId.

  • Success Response

    • Code: 200

    • Content:

      {  
        id:        [uuid],
        name:      [string],
        locale:    [string],
        defaults:  [json],
        body:      [json],
        appId:     [uuid],
        createdBy: [string]
        createdAt: [int64],
        updatedAt: [int64]
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Update Template

PUT /apps/:appId/templates/:templateId

Updates the template that has id templateId.

  • Payload

    {
      name:      [string],
      locale:    [string],
      defaults:  [json],   // cannot be empty
      body:      [json]   // cannot be empty
    }
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:        [uuid],
        name:      [string],
        locale:    [string],
        defaults:  [json],  
        body:      [json],  
        appId:     [uuid],
        createdBy: [string],
        createdAt: [int64],
        updatedAt: [int64]  
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there is already a template with the same appId, name and locale.

    • Code: 409

    • Content:

      {
        "reason": [string]
      }
      

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Delete Template

DELETE /apps/:appId/templates/:templateId

Deletes the templaye that has id templateId.

  • Success Response

    • Code: 204
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Job Routes

List app jobs

GET /apps/:appId/jobs?template=<optional-template-name>

List all jobs for the app with the given id. If the template query string parameter is sent only jobs for the templates with this name will be returned.

  • Success Response

    • Code: 200

    • Content:

      [
        {  
          id:                  [uuid],
          totalBatches:        [null|int], // if null the total batches that will be sent was not calculated yet
          completedBatches:    [int],
          totalUsers:          [null|int], // if null the total users that will receive the push was not calculated yet
          totalTokens:         [null|int], // if null the total tokens that will receive the push was not calculated yet
          completedTokens:     [int],
          dbPageSize:          [int],    // page size that will be used for retrieving tokens from the database
          localized:           [boolean],
          completedAt:         [int64],  // nanoseconds since epoch,
          expiresAt:           [int64],  // nanoseconds since epoch, optional but if > 0 push will no longer be sent after this timestamp,
          startsAt:            [int64],  // nanoseconds since epoch, optional but if > 0 job was scheduled,
          context:             [json],   // optional
          service:             [gcm|apns],
          filters:             [json],   // optional
          metadata:            [json],   // optional
          csvPath:             [string], // full path of the S3 file with the csv containing users ids for this job,
          templateName:        [string], // can also be several strings separated by commas
          pastTimeStrategy:    [null|string], // null if job is not localized or one of [skip, nextDay]
          status:              [null|string], // null if job is running or one of [paused, stopped, circuitbreak]
          appId:               [uuid],
          createdBy:           [string], // email
          createdAt:           [int64],  // nanoseconds since epoch
          updatedAt:           [int64],  // nanoseconds since epoch
          controlGroup:        [float],  // float between 0-1, represents the % of users that won't receive notifications
          controlGroupCsvPath: [string]  // full path of the S3 file with the csv containing users ids of users in the control group
        },
        {  
          id:                  [uuid],
          totalBatches:        [null|int],
          completedBatches:    [int],
          totalUsers:          [null|int],
          totalTokens:         [null|int],
          completedTokens:     [int],
          dbPageSize:          [int],   
          localized:           [boolean],
          completedAt:         [int64],
          expiresAt:           [int64],
          startsAt:            [int64],
          context:             [json],  
          service:             [gcm|apns],
          filters:             [json],  
          metadata:            [json],  
          csvPath:             [string],
          templateName:        [string],
          pastTimeStrategy:    [null|string],
          status:              [null|string],
          appId:               [uuid],
          createdBy:           [string],
          createdAt:           [int64],
          updatedAt:           [int64],
          controlGroup:        [float],
          controlGroupCsvPath: [string]
        },
        ...
      ]
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Create Job

POST /apps/:appId/jobs?template=<mandatory-template-name>

Creates a new job with the given parameters and template name. The template name can be composed of several template names separated by commas. Example POST /apps/:appId/jobs?template=tpl1,tpl2,tpl3,tpl4. In this case the template messages will be randomly chosen for each user using a uniform distribution.

  • Payload

    {
      localized:        [boolean],
      expiresAt:        [int64],  // nanoseconds since epoch, optional but if > 0 push will no longer be sent after this timestamp,
      startsAt:         [int64],  // nanoseconds since epoch, optional but if > 0 job was scheduled,
      context:          [json],   // optional
      service:          [gcm|apns],
      filters:          [json],   // optional
      metadata:         [json],   // optional
      csvPath:          [string], // full path of the S3 file with the csv containing users ids for this job,
      pastTimeStrategy: [null|string], // null if job is not localized or one of [skip, nextDay]
      controlGroup:     [float]   // float between 0-1, represents the % of users that won't receive notifications
    }
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:               [uuid],
        totalBatches:     [null|int],
        completedBatches: [int],
        totalUsers:       [null|int],
        completedUsers:   [int],
        completedTokens:  [int],
        dbPageSize:       [int],   
        localized:        [boolean],
        completedAt:      [int64],
        expiresAt:        [int64],
        startsAt:         [int64],
        context:          [json],  
        service:          [gcm|apns],
        filters:          [json],  
        metadata:         [json],  
        csvPath:          [string],
        templateName:     [string],
        pastTimeStrategy: [null|string],
        status:           [null|string],
        appId:            [uuid],
        createdBy:        [string],
        createdAt:        [int64],
        updatedAt:        [int64],
        controlGroup:        [float],
        controlGroupCsvPath: [string]
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Retrieve Job

GET /apps/:appId/jobs/:jobId

Retrieves the app that has id jobId.

  • Success Response

    • Code: 200

    • Content:

      {  
        id:               [uuid],
        totalBatches:     [null|int],
        completedBatches: [int],
        totalUsers:       [null|int],
        completedUsers:   [int],
        completedTokens:  [int],
        dbPageSize:       [int],   
        localized:        [boolean],
        completedAt:      [int64],
        expiresAt:        [int64],
        startsAt:         [int64],
        context:          [json],  
        service:          [gcm|apns],
        filters:          [json],  
        metadata:         [json],  
        csvPath:          [string],
        templateName:     [string],
        pastTimeStrategy: [null|string],
        status:           [null|string],
        appId:            [uuid],
        createdBy:        [string],
        createdAt:        [int64],
        updatedAt:        [int64],
        controlGroup:        [float],
        controlGroupCsvPath: [string]
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Pause Job

PUT /apps/:appId/jobs/:jobId/pause

Pauses the job that has id jobId.

  • Payload

    {}
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:               [uuid],
        totalBatches:     [null|int],
        completedBatches: [int],
        totalUsers:       [null|int],
        completedUsers:   [int],
        completedTokens:  [int],
        dbPageSize:       [int],   
        localized:        [boolean],
        completedAt:      [int64],
        expiresAt:        [int64],
        startsAt:         [int64],
        context:          [json],  
        service:          [gcm|apns],
        filters:          [json],  
        metadata:         [json],  
        csvPath:          [string],
        templateName:     [string],
        pastTimeStrategy: [null|string],
        status:           "paused",
        appId:            [uuid],
        createdBy:        [string],
        createdAt:        [int64],
        updatedAt:        [int64],
        controlGroup:        [float],
        controlGroupCsvPath: [string]
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there are missing or invalid parameters or if the job previous status was not null.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Stop Job

PUT /apps/:appId/jobs/:jobId/pause

Stops the job that has id jobId.

  • Payload

    {}
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:               [uuid],
        totalBatches:     [null|int],
        completedBatches: [int],
        totalUsers:       [null|int],
        completedUsers:   [int],
        completedTokens:  [int],
        dbPageSize:       [int],   
        localized:        [boolean],
        completedAt:      [int64],
        expiresAt:        [int64],
        startsAt:         [int64],
        context:          [json],  
        service:          [gcm|apns],
        filters:          [json],  
        metadata:         [json],  
        csvPath:          [string],
        templateName:     [string],
        pastTimeStrategy: [null|string],
        status:           "stopped",
        appId:            [uuid],
        createdBy:        [string],
        createdAt:        [int64],
        updatedAt:        [int64],
        controlGroup:        [float],
        controlGroupCsvPath: [string]
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there are missing or invalid parameters.

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Resume Job

PUT /apps/:appId/jobs/:jobId/resume

Resumes the job that has id jobId.

  • Payload

    {}
    
  • Success Response

    • Code: 201

    • Content:

      {  
        id:               [uuid],
        totalBatches:     [null|int],
        completedBatches: [int],
        totalUsers:       [null|int],
        completedUsers:   [int],
        completedTokens:  [int],
        dbPageSize:       [int],   
        localized:        [boolean],
        completedAt:      [int64],
        expiresAt:        [int64],
        startsAt:         [int64],
        context:          [json],  
        service:          [gcm|apns],
        filters:          [json],  
        metadata:         [json],  
        csvPath:          [string],
        templateName:     [string],
        pastTimeStrategy: [null|string],
        status:           null,
        appId:            [uuid],
        createdBy:        [string],
        createdAt:        [int64],
        updatedAt:        [int64],
        controlGroup:        [float],
        controlGroupCsvPath: [string]
      }
      
  • Error Response

    It will return an error if no x-forwarded-email header is specified

    • Code: 401

    It will return an error if there are missing or invalid parameters or if the job previous status was not [paused|circuitbreak].

    • Code: 422

    • Content:

      {
        "reason": [string]
      }
      
    • Code: 500

    • Content:

      {
        "reason": [string]
      }
      

Marathon Workers

Overview

Depending on the job type, different workers can be called. See the image below:

_images/Workers.pngexample

This image shows the workers execution order. Also, you can see each data that go between the workers.

When we have a scheduled job with multiple timezones, multiple jobs will be created and scheduled. Each job will be processed within 1 hour intervals, for example, if a user is at the +1215 timezone he will receive a message when the +1200 is processed.

Workers

Direct Worker

The API is responsible to create batches using an estimative of the database size. This worker process these batches. It will query the PUSH_DB using the job filters, creates the messages and send to Kafka. This worker is really fast and can handle big amount of tokens (tested with 1.5x10^8 tokens).

If a control group is set, it will be saved on Redis. The completed job worker will pull this data and create a CSV with the control group ids.

It will not generate a CSV of the sent messages.

This worker will produce two metrics:

  • starting_direct_part: represents when the worker starts;
  • get_from_pg represent the spent time on retrieving data from the database.

After processing all parts, the Job Completed Worker is called. To know if all batches are completed, a counter is the Redis is used.

CSV Split Worker

This worker downloads a CSV file from AWS S3, reads it size and splits it in small batches. Only one worker will do this job.

After creating each batch, it will send csv_job_part metric.

Create Batches Worker

This worker downloads a part of the CSV file from AWS S3, reads it and creates batches of user information (locale, token, tz).

Process Batch Worker

This worker receives a batch of user information (locale and token), builds the template for each user using the locale information and the job template name and send the message to the Kafka topic corresponding to the job app and service. If the error rate is more than a threshold this job enters circuit break state. When the job is paused or in circuit break, the batches are stored in a paused job list in Redis with an expiration of one week.

When this work starts, it sends the starting_process_batch_worker metric.

The metric send_message_return is reported when messages are delivered to Kafka, either successfully or with errors.

The batches are defined by a position in the CSV and the number of bytes to read from that position. Because of that, during the batches creation, some IDs can be divided and have it beginning in one batch and the end in other. To recovery this IDs, each process batch worker will search for the first \n in the file and will considerate all the bytes before this marker has part of one split ID. The method is similar at the end of the file, it will considerate all bytes since the las \n as part of another split ID. It will save this information in the Redis and after all, batches are processed, the last worker will retrieve this information, join the splits IDs and send the messages.

To know if all batches are completed, a counter is the Redis is used.

Job Completed Worker

When all Process Batch Workers or all Direct Workers is completed, they will call this worker.

This worker will send one email saying the job is completed.

Also, it reads the Redis and creates the control group CSV.

Resume Job Worker

This worker handles jobs that are paused or in circuit break state. It removes a batch from the paused job list and calls the process batch worker for each one of them until are has no more paused batches.

Metrics

Each one of the workers has metrics indicating the start (e.g. starting_create_batches_worker), the completion (e.g. completed_create_batches_worker) and possible execution errors (e.g. error_create_batches_worker). The error metrics are only generated for internal errors or input validation errors, for instance when no valid IDs are present in the CSV file. In every other case, the workers will generate the completed metric.

Marathon Feedbacks

After a message is sent to the kafka topic corresponding to the job app and service, another system will attempt to send the push notification to APNS or GCM and write a feedback of this successful or failed push notification in another queue, the feedback kafka.

Marathon has another command, besides the API and the workers, that starts a feedback listener that reads from this kafka’s topics and update the job’s feedback column in the PostgreSQL database. The messages in this queue contain all metadata sent by marathon including the job id.

Feedbacks column

The feedbacks column contains a JSON in the following format:

{
  "ack":        <int>,  // count
  "error-key1": <int>,  // count
  "error-key2": <int>   // count
  ...
}

In the case of successful push notifications the key is ack. For failed push notifications the key will be the error reason received from APNS or GCM, for example BAD_REGISTRATION, unregistered, etc.

To avoid updating the job entry in the PostgreSQL database for every message received in the feedbacks kafka, we update the database periodically (defaults to every 5 seconds) by using a local cache to store all feedbacks received in the mean time.

Indices and tables