Pub / Sub#

Authentication / Configuration#

  • Use Client objects to configure your applications.

  • In addition to any authentication configuration, you should also set the GOOGLE_CLOUD_PROJECT environment variable for the project you’d like to interact with. If you are Google App Engine or Google Compute Engine this will be detected automatically.

  • The library now enables the gRPC transport for the pubsub API by default, assuming that the required dependencies are installed and importable. To disable this transport, set the GOOGLE_CLOUD_DISABLE_GRPC environment variable to a non-empty string, e.g.: $ export GOOGLE_CLOUD_DISABLE_GRPC=true.

  • Client objects hold both a project and an authenticated connection to the PubSub service.

  • The authentication credentials can be implicitly determined from the environment or directly via from_service_account_json and from_service_account_p12.

  • After setting GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT environment variables, create a Client

    >>> from google.cloud import pubsub
    >>> client = pubsub.Client()
    

Manage topics for a project#

List topics for the default project:

    for topic in client.list_topics():   # API request(s)
        do_something_with(topic)

Create a new topic for the default project:

    topic = client.topic(TOPIC_NAME)
    topic.create()              # API request

Check for the existence of a topic:

    assert not topic.exists()   # API request
    topic.create()              # API request
    assert topic.exists()       # API request

Delete a topic:

    assert topic.exists()       # API request
    topic.delete()
    assert not topic.exists()   # API request

Fetch the IAM policy for a topic:

    policy = topic.get_iam_policy()             # API request

Update the IAM policy for a topic:

    ALL_USERS = policy.all_users()
    policy.viewers = [ALL_USERS]
    LOGS_GROUP = policy.group('cloud-logs@google.com')
    policy.editors = [LOGS_GROUP]
    new_policy = topic.set_iam_policy(policy)   # API request

Test permissions allowed by the current IAM policy on a topic:

    from google.cloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
    TO_CHECK = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE]
    ALLOWED = topic.check_iam_permissions(TO_CHECK)
    assert set(ALLOWED) == set(TO_CHECK)

Publish messages to a topic#

Publish a single message to a topic, without attributes:

    topic.publish(b'This is the message payload')               # API request

Publish a single message to a topic, with attributes:

    topic.publish(b'Another message payload', extra='EXTRA')    # API request

Publish a set of messages to a topic (as a single request):

    with topic.batch() as batch:
        batch.publish(PAYLOAD1)
        batch.publish(PAYLOAD2, extra=EXTRA)

Note

The only API request happens during the __exit__() of the topic used as a context manager, and only if the block exits without raising an exception.

Manage subscriptions to topics#

List all subscriptions for the default project:

    for subscription in client.list_subscriptions():  # API request(s)
        do_something_with(subscription)

List subscriptions for a topic:

    for subscription in topic.list_subscriptions():   # API request(s)
        do_something_with(subscription)

Create a new pull subscription for a topic, with defaults:

    sub_defaults = topic.subscription(SUB_DEFAULTS)

Create a new pull subscription for a topic with a non-default ACK deadline:

    sub_ack90 = topic.subscription(SUB_ACK90, ack_deadline=90)

Create a new push subscription for a topic:

    subscription = topic.subscription(SUB_PUSH, push_endpoint=PUSH_URL)
    subscription.create()               # API request

Check for the existence of a subscription:

    assert subscription.exists()                        # API request

Convert a pull subscription to push:

    subscription.modify_push_configuration(
        push_endpoint=PUSH_URL)                                 # API request

Convert a push subscription to pull:

    subscription.modify_push_configuration(push_endpoint=None)  # API request

Re-synchronize a subscription with the back-end:

    subscription.reload()                               # API request

Fetch the IAM policy for a subscription

    policy = subscription.get_iam_policy()             # API request

Update the IAM policy for a subscription:

    ALL_USERS = policy.all_users()
    policy.viewers = [ALL_USERS]
    LOGS_GROUP = policy.group('cloud-logs@google.com')
    policy.editors = [LOGS_GROUP]
    new_policy = subscription.set_iam_policy(policy)   # API request

Test permissions allowed by the current IAM policy on a subscription:

    from google.cloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
    TO_CHECK = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE]
    ALLOWED = subscription.check_iam_permissions(TO_CHECK)
    assert set(ALLOWED) == set(TO_CHECK)

Delete a subscription:

    subscription.delete()                               # API request

Pull messages from a subscription#

Fetch pending messages for a pull subscription:

    pulled = subscription.pull(max_messages=2)

Note that received messages must be acknowledged, or else the back-end will re-send them later:

    for ack_id, message in pulled:
        try:
            do_something_with(message)
        except ApplicationException as e:
            log_exception(e)
        else:
            subscription.acknowledge([ack_id])

Fetch messages for a pull subscription without blocking (none pending):

    pulled = subscription.pull(return_immediately=True)

Update the acknowlegement deadline for pulled messages:

    for ack_id, _ in pulled:
        subscription.modify_ack_deadline(ack_id, 90)    # API request

Fetch pending messages, acknowledging those whose processing doesn’t raise an error:

    from google.cloud.pubsub.subscription import AutoAck
    with AutoAck(subscription, max_messages=10) as ack:
        for ack_id, message in list(ack.items()):
            try:
                do_something_with(message)
            except Exception:  # pylint: disable=broad-except
                del ack[ack_id]

Note

The pull API request occurs at entry to the with block, and the acknowlege API request occurs at the end, passing only the ack_ids which haven’t been deleted from ack