How can I add an AI moderation service to a chat built with Ably

This is how you can integrate a third-party content moderation provider into your Ably workflow.

There are many ways in which you can integrate a third-party content moderation provider into your Ably workflow. Broadly speaking, moderation can be done in two ways:

  • Synchronously: before you publish the message to Ably. This approach prevents harmful content from ever being published to an Ably channel and received by other users - and is useful for situations involving schools or regulatory requirements. The downside to this approach is that the moderation service becomes a potential bottleneck in your message pipeline.
  • Asynchronously: after a message has been published. This is sometimes preferred in large-scale chat rooms where community standards are less restrictive - as it allows messages to continue to be posted and not become dependent on a moderation service keeping up with demand.

Asynchronous Moderation

Asynchronous moderation can be easily achieved by leveraging either a service that is connected to the channel, or a channel integration rule, that forwards messages to a service you run.

In this model, your clients send messages over an Ably channel, it then gets forwarded to your service by a channel rule and finally your service can react to a negative moderation result by forwarding another message to clients that instructs them to delete or hide the content.

Here is some sample code for an AWS Lambda function that performs asynchronous moderation of messages:

Python

import asyncio
import boto3
import os
import requests
import json
import time
from ably import AblyRest
from ably.types.message import Message


# Use the AWS systems manager extension to get parameters from SSM, as opposed to boto3
def get_parameter_store_value(param) -> str:
    headers = {
        "X-Aws-Parameters-Secrets-Token": os.environ.get('AWS_SESSION_TOKEN')
    }
response = requests.get(
        'http://localhost:2773/systemsmanager/parameters/get/?name=' + param
+ "&withDecryption=true", headers=headers
    ).json()
    return response['Parameter']['Value']

async def message_requires_moderation(message: str) -> bool:
    # Your moderation code here
    return True


# Store the banned user in DynamoDB so a token-issuing function can check it
async def mark_user_as_banned(client_id: str) -> None:
    dynamodb = boto3.client('dynamodb')
    item = {
        'clientId': {'S': client_id},
        'ttl': {'N': str(int(time.time()) + 120)}  # 2 minutes
    }

    dynamodb.put_item(
        TableName='bannedUsers',
        Item=item
    )


# Revoke the banned users Ably token, so that they can no longer publish messages
async def revoke_access_token(client_id: str) -> None:
    await mark_user_as_banned(client_id)

    ably_token_key = get_parameter_store_value('/ably/chat/prototype/token-request-api-key')
    ably = AblyRest(ably_token_key)
    print("Revoking token for clientId " + client_id)
    # Create the path
    token_revocation_path = '/keys/' + ably_token_key.split(':')[0] + '/revokeTokens'

    # Create the body
    token_revocation_body = {
        'targets': ['clientId:' + client_id]
    }

    # Send it
    await ably.http.post(token_revocation_path, body=token_revocation_body)


# Send a message to the channel to say that moderation has happened
# Skip integration rules on the channel so message isn't sent back to us
async def send_moderation_response(message_id: str, channel: str, timeserial: str) -> None:
    ably = AblyRest(get_parameter_store_value('/ably/chat/prototype/moderation-publish-api-key'))

    moderation_response = {
        'moderation': True,
        'message_id': message_id,
        'was_banned': True
    }
    moderation_extras = {
        'ref': {
            'type': 'com.ably.moderation.response',
            'timeserial': timeserial
        },
        'privileged': {
            'skipRule': '*'
        }
    }

    print('Sending moderation response: ' + json.dumps(moderation_response))
    await ably.channels.get(channel) \
        .publish(Message(name='message', data=moderation_response, extras=moderation_extras))


# Moderate the message
async def moderate_message(channel: str, message: dict) -> None:
    message_data = json.loads(message['data'])

    if await message_requires_moderation(message_data['message']):
        await asyncio.wait([
            revoke_access_token(message['clientId']),
            send_moderation_response(message['id'], channel, message['extras']['timeserial'])
        ])


# Receive messages from an Ably SQS rule and process them
async def process_messages(event):
    processing_messages = []
    for record in event['Records']:
        event_body = json.loads(record['body'])
        channel_name = event_body['channel']
        for message in event_body['messages']:
            processing_messages.append(moderate_message(channel_name, message))

    await asyncio.wait(processing_messages)
    return {'statusCode': 200}


def main(event, context):
    result = asyncio.get_event_loop().run_until_complete(asyncio.gather(process_messages(event)))
    return result[0]

Synchronous Moderation

One way that synchronous moderation can be achieved, is to first-of-all send the message to your own servers, or directly to a third party provider. Once moderation has been passed, this can then be forwarded to Ably - taking advantage of the “publish on behalf of” feature.

Here is some example code of an AWS Lambda function that takes a message payload, sends it for moderation and then forwards it to Ably:

Python

import json
import asyncio
import boto3
from ably import AblyRest
from ably.types.message import Message

ssm = boto3.client('ssm')
publishKey = ssm.get_parameter(Name='/ably/chat/publish-key', WithDecryption=True)
ablyRest = AblyRest(publishKey['Parameter']['Value'])


async def publish_message(event):
    try:
        body = json.loads(event['body'])
        moderationResponse =  # Your moderation code here

        # If moderation fails, ban the user and let the channel know
        if moderationResponse == False:
            await ablyRest.channels.get(body['channel']).publish('banned_user', '123')
            body = json.dumps({
                "message": "Message was moderated for abuse, you have been banned.",
            })
            return {"statusCode": 400, "body": body}

        # Send the message to the Ably channel, on behalf of as the user
        await ablyRest.channels.get(body['channel'], connection_key=body['connection_key']).publish(
            Message(name='message', data=body['message'], connection_key=body['connection_key']))
    except AttributeError:
        return {"statusCode": 400}

    # Print response
    body = json.dumps({
        "message": "Message received successfully",
    })
    return {"statusCode": 200, "body": body}


def main(event, context):
    result = asyncio.get_event_loop().run_until_complete(asyncio.gather(publish_message(event)))
    return result[0]

You can also use an inbox-outbox pattern with Ably channels to achieve the same effect. In this scenario, all of your clients are able to publish to an outbox channel for unmoderated messages, as well as subscribe to the inbox channel, where moderated messages are re-published.

When a message is submitted, firstly you send the unmoderated message to an outbox channel on Ably. From there, the message can be directed to a service that you run using one of Ably’s many options for channel integration rules.

Once the service has performed the moderation, the message can then be forwarded to the outbox channel, to be received by your clients.

Please note that in the inbox-outbox pattern, you cannot necessarily guarantee that messages will be moderated in the same order in which they are received, unless you utilise something that guarantees ordering: such as a FIFO queue.

 

Read more about other chat features you can create with Ably