BigQuery#

Note

This library changed significantly before the 1.0.0 release, especially between version 0.27 and 0.28. See Migrating from the BigQuery Python client library version 0.27 for instructions on how to migrated your code to the most recent version of this library.

Authentication / Configuration#

  • Use Client objects to configure your applications.

  • Client objects hold both a project and an authenticated connection to the BigQuery service.

  • The authentication credentials can be implicitly determined from the environment or directly via from_service_account_json and from_service_account_p12.

  • After setting GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT environment variables, create an instance of Client.

    >>> from google.cloud import bigquery
    >>> client = bigquery.Client()
    

Projects#

A project is the top-level container in the BigQuery API: it is tied closely to billing, and can provide default access control across all its datasets. If no project is passed to the client container, the library attempts to infer a project using the environment (including explicit environment variables, GAE, and GCE).

To override the project inferred from the environment, pass an explicit project to the constructor, or to either of the alternative classmethod factories:

>>> from google.cloud import bigquery
>>> client = bigquery.Client(project='PROJECT_ID')

Project ACLs#

Each project has an access control list granting reader / writer / owner permission to one or more entities. This list cannot be queried or set via the API; it must be managed using the Google Developer Console.

Datasets#

A dataset represents a collection of tables, and applies several default policies to tables as they are created:

  • An access control list (ACL). When created, a dataset has an ACL which maps to the ACL inherited from its project.
  • A default table expiration period. If set, tables created within the dataset will have the value as their expiration period.

See BigQuery documentation for more information on Datasets.

Dataset operations#

List datasets for the client’s project:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    datasets = list(client.list_datasets())
    project = client.project

    if datasets:
        print('Datasets in project {}:'.format(project))
        for dataset in datasets:  # API request(s)
            print('\t{}'.format(dataset.dataset_id))
    else:
        print('{} project does not contain any datasets.'.format(project))

Create a new dataset for the client’s project:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'

    # Create a DatasetReference using a chosen dataset ID.
    # The project defaults to the Client's project if not specified.
    dataset_ref = client.dataset(dataset_id)

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_ref)
    # Specify the geographic location where the dataset should reside.
    dataset.location = 'US'

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.AlreadyExists if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # API request

Refresh metadata for a dataset (to pick up changes made by another client):

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'

    dataset_ref = client.dataset(dataset_id)
    dataset = client.get_dataset(dataset_ref)  # API request

    # View dataset properties
    print('Dataset ID: '.format(dataset_id))
    print('Description: '.format(dataset.description))
    print('Labels:')
    for label, value in dataset.labels.items():
        print('\t{}: {}'.format(label, value))
    # View tables in dataset
    print('Tables:')
    tables = list(client.list_tables(dataset_ref))  # API request(s)
    if tables:
        for table in tables:
            print('\t{}'.format(table.table_id))
    else:
        print('\tThis dataset does not contain any tables.')

Update a property in a dataset’s metadata:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_ref = client.dataset('my_dataset')
    # dataset = client.get_dataset(dataset_ref)  # API request

    assert dataset.description == 'Original description.'
    dataset.description = 'Updated description.'

    dataset = client.update_dataset(dataset, ['description'])  # API request

    assert dataset.description == 'Updated description.'

Modify user permissions on a dataset:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset = client.get_dataset(client.dataset('my_dataset'))

    entry = bigquery.AccessEntry(
        role='READER',
        entity_type='userByEmail',
        entity_id='sample.bigquery.dev@gmail.com')
    assert entry not in dataset.access_entries
    entries = list(dataset.access_entries)
    entries.append(entry)
    dataset.access_entries = entries

    dataset = client.update_dataset(dataset, ['access_entries'])  # API request

    assert entry in dataset.access_entries

Delete a dataset:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    # Delete a dataset that does not contain any tables
    # dataset1_id = 'my_empty_dataset'
    dataset1_ref = client.dataset(dataset1_id)
    client.delete_dataset(dataset1_ref)  # API request

    print('Dataset {} deleted.'.format(dataset1_id))

    # Use the delete_contents parameter to delete a dataset and its contents
    # dataset2_id = 'my_dataset_with_tables'
    dataset2_ref = client.dataset(dataset2_id)
    client.delete_dataset(dataset2_ref, delete_contents=True)  # API request

    print('Dataset {} deleted.'.format(dataset2_id))

Tables#

Tables exist within datasets. See BigQuery documentation for more information on Tables.

Table operations#

List tables for the dataset:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_ref = client.dataset('my_dataset')

    tables = list(client.list_tables(dataset_ref))  # API request(s)
    assert len(tables) == 0

    table_ref = dataset.table('my_table')
    table = bigquery.Table(table_ref)
    client.create_table(table)                  # API request
    tables = list(client.list_tables(dataset))  # API request(s)

    assert len(tables) == 1
    assert tables[0].table_id == 'my_table'

Create a table:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_ref = client.dataset('my_dataset')

    schema = [
        bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
        bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'),
    ]
    table_ref = dataset_ref.table('my_table')
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)  # API request

    assert table.table_id == 'my_table'

Get a table:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'
    # table_id = 'my_table'

    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    table = client.get_table(table_ref)  # API Request

    # View table properties
    print(table.schema)
    print(table.description)
    print(table.num_rows)

Update a property in a table’s metadata:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # table_ref = client.dataset('my_dataset').table('my_table')
    # table = client.get_table(table_ref)  # API request

    assert table.description == 'Original description.'
    table.description = 'Updated description.'

    table = client.update_table(table, ['description'])  # API request

    assert table.description == 'Updated description.'

Browse selected rows in a table:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    dataset_ref = client.dataset('samples', project='bigquery-public-data')
    table_ref = dataset_ref.table('shakespeare')
    table = client.get_table(table_ref)  # API call

    # Load all rows from a table
    rows = client.list_rows(table)
    assert len(list(rows)) == table.num_rows

    # Load the first 10 rows
    rows = client.list_rows(table, max_results=10)
    assert len(list(rows)) == 10

    # Specify selected fields to limit the results to certain columns
    fields = table.schema[:2]  # first two columns
    rows = client.list_rows(table, selected_fields=fields, max_results=10)
    assert len(rows.schema) == 2
    assert len(list(rows)) == 10

    # Use the start index to load an arbitrary portion of the table
    rows = client.list_rows(table, start_index=10, max_results=10)

    # Print row data in tabular format
    format_string = '{!s:<16} ' * len(rows.schema)
    field_names = [field.name for field in rows.schema]
    print(format_string.format(*field_names))  # prints column headers
    for row in rows:
        print(format_string.format(*row))      # prints row data

Insert rows into a table’s data:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    rows_to_insert = [
        (u'Phred Phlyntstone', 32),
        (u'Wylma Phlyntstone', 29),
    ]

    errors = client.insert_rows(table, rows_to_insert)  # API request

    assert errors == []

Copy a table:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    source_dataset = client.dataset('samples', project='bigquery-public-data')
    source_table_ref = source_dataset.table('shakespeare')

    # dataset_id = 'my_dataset'
    dest_table_ref = client.dataset(dataset_id).table('destination_table')

    job = client.copy_table(
        source_table_ref,
        dest_table_ref,
        # Location must match that of the source and destination tables.
        location='US')  # API request

    job.result()  # Waits for job to complete.

    assert job.state == 'DONE'
    dest_table = client.get_table(dest_table_ref)  # API request
    assert dest_table.num_rows > 0

Extract a table to Google Cloud Storage:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # bucket_name = 'my-bucket'
    project = 'bigquery-public-data'
    dataset_id = 'samples'
    table_id = 'shakespeare'

    destination_uri = 'gs://{}/{}'.format(bucket_name, 'shakespeare.csv')
    dataset_ref = client.dataset(dataset_id, project=project)
    table_ref = dataset_ref.table(table_id)

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location='US')  # API request
    extract_job.result()  # Waits for job to complete.

    print('Exported {}:{}.{} to {}'.format(
        project, dataset_id, table_id, destination_uri))

Delete a table:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'
    # table_id = 'my_table'

    table_ref = client.dataset(dataset_id).table(table_id)
    client.delete_table(table_ref)  # API request

    print('Table {}:{} deleted.'.format(dataset_id, table_id))

Upload table data from a file:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # filename = '/path/to/file.csv'
    # dataset_id = 'my_dataset'
    # table_id = 'my_table'

    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1
    job_config.autodetect = True

    with open(filename, 'rb') as source_file:
        job = client.load_table_from_file(
            source_file,
            table_ref,
            location='US',  # Must match the destination dataset location.
            job_config=job_config)  # API request

    job.result()  # Waits for table load to complete.

    print('Loaded {} rows into {}:{}.'.format(
        job.output_rows, dataset_id, table_id))

Load table data from Google Cloud Storage#

See also: Loading JSON data from Cloud Storage.

Load a CSV file from Cloud Storage:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'

    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('name', 'STRING'),
        bigquery.SchemaField('post_abbr', 'STRING')
    ]
    job_config.skip_leading_rows = 1
    # The source format defaults to CSV, so the line below is optional.
    job_config.source_format = bigquery.SourceFormat.CSV
    uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.csv'

    load_job = client.load_table_from_uri(
        uri,
        dataset_ref.table('us_states'),
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert client.get_table(dataset_ref.table('us_states')).num_rows == 50

Load a JSON file from Cloud Storage:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'

    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('name', 'STRING'),
        bigquery.SchemaField('post_abbr', 'STRING')
    ]
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.json'

    load_job = client.load_table_from_uri(
        uri,
        dataset_ref.table('us_states'),
        location='US',  # Location must match that of the destination dataset.
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert client.get_table(dataset_ref.table('us_states')).num_rows > 0

Load a Parquet file from Cloud Storage:

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'

    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.PARQUET
    uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet'

    load_job = client.load_table_from_uri(
        uri,
        dataset_ref.table('us_states'),
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert client.get_table(dataset_ref.table('us_states')).num_rows > 0

Customer Managed Encryption Keys#

Table data is always encrypted at rest, but BigQuery also provides a way for you to control what keys it uses to encrypt they data. See Protecting data with Cloud KMS keys in the BigQuery documentation for more details.

Create a new table, using a customer-managed encryption key from Cloud KMS to encrypt it.

    # from google.cloud import bigquery
    # client = bigquery.Client()

    table_ref = dataset.table('my_table')
    table = bigquery.Table(table_ref)

    # Set the encryption key to use for the table.
    # TODO: Replace this key with a key you have created in Cloud KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    table.encryption_configuration = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)

    table = client.create_table(table)  # API request

    assert table.encryption_configuration.kms_key_name == kms_key_name

Change the key used to encrypt a table.

    # from google.cloud import bigquery
    # client = bigquery.Client()

    assert table.encryption_configuration.kms_key_name == original_kms_key_name

    # Set a new encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    updated_kms_key_name = (
        'projects/cloud-samples-tests/locations/us-central1/'
        'keyRings/test/cryptoKeys/otherkey')
    table.encryption_configuration = bigquery.EncryptionConfiguration(
        kms_key_name=updated_kms_key_name)

    table = client.update_table(
        table, ['encryption_configuration'])  # API request

    assert table.encryption_configuration.kms_key_name == updated_kms_key_name
    assert original_kms_key_name != updated_kms_key_name

Load a file from Cloud Storage, using a customer-managed encryption key from Cloud KMS for the destination table.

    # from google.cloud import bigquery
    # client = bigquery.Client()
    # dataset_id = 'my_dataset'

    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON

    # Set the encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    encryption_config = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)
    job_config.destination_encryption_configuration = encryption_config
    uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.json'

    load_job = client.load_table_from_uri(
        uri,
        dataset_ref.table('us_states'),
        location='US',  # Location must match that of the destination dataset.
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    table = client.get_table(dataset_ref.table('us_states'))
    assert table.encryption_configuration.kms_key_name == kms_key_name

Copy a table, using a customer-managed encryption key from Cloud KMS for the destination table.

    # from google.cloud import bigquery
    # client = bigquery.Client()

    source_dataset = bigquery.DatasetReference(
        'bigquery-public-data', 'samples')
    source_table_ref = source_dataset.table('shakespeare')

    # dataset_id = 'my_dataset'
    dest_dataset_ref = client.dataset(dataset_id)
    dest_table_ref = dest_dataset_ref.table('destination_table')

    # Set the encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    encryption_config = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)
    job_config = bigquery.CopyJobConfig()
    job_config.destination_encryption_configuration = encryption_config

    job = client.copy_table(
        source_table_ref,
        dest_table_ref,
        # Location must match that of the source and destination tables.
        location='US',
        job_config=job_config)  # API request
    job.result()  # Waits for job to complete.

    assert job.state == 'DONE'
    dest_table = client.get_table(dest_table_ref)
    assert dest_table.encryption_configuration.kms_key_name == kms_key_name

Write query results to a table, using a customer-managed encryption key from Cloud KMS for the destination table.

    # from google.cloud import bigquery
    # client = bigquery.Client()

    job_config = bigquery.QueryJobConfig()

    # Set the destination table. Here, dataset_id is a string, such as:
    # dataset_id = 'your_dataset_id'
    table_ref = client.dataset(dataset_id).table('your_table_id')
    job_config.destination = table_ref

    # Set the encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    encryption_config = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)
    job_config.destination_encryption_configuration = encryption_config

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        'SELECT 17 AS my_col;',
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location='US',
        job_config=job_config)  # API request - starts the query
    query_job.result()

    # The destination table is written using the encryption configuration.
    table = client.get_table(table_ref)
    assert table.encryption_configuration.kms_key_name == kms_key_name

Queries#

Querying data#

Run a query and wait for it to finish:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    query = (
        'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
        'WHERE state = "TX" '
        'LIMIT 100')
    query_job = client.query(
        query,
        # Location must match that of the dataset(s) referenced in the query.
        location='US')  # API request - starts the query

    for row in query_job:  # API request - fetches results
        # Row values can be accessed by field name or index
        assert row[0] == row.name == row['name']
        print(row)

Run a dry run query#

    # from google.cloud import bigquery
    # client = bigquery.Client()

    job_config = bigquery.QueryJobConfig()
    job_config.dry_run = True
    job_config.use_query_cache = False
    query_job = client.query(
        ('SELECT name, COUNT(*) as name_count '
         'FROM `bigquery-public-data.usa_names.usa_1910_2013` '
         "WHERE state = 'WA' "
         'GROUP BY name'),
        # Location must match that of the dataset(s) referenced in the query.
        location='US',
        job_config=job_config)  # API request

    # A dry run query completes immediately.
    assert query_job.state == 'DONE'
    assert query_job.dry_run

    print("This query will process {} bytes.".format(
        query_job.total_bytes_processed))

Writing query results to a destination table#

See BigQuery documentation for more information on writing query results.

    # from google.cloud import bigquery
    # client = bigquery.Client()

    job_config = bigquery.QueryJobConfig()

    # Set the destination table. Here, dataset_id is a string, such as:
    # dataset_id = 'your_dataset_id'
    table_ref = client.dataset(dataset_id).table('your_table_id')
    job_config.destination = table_ref

    # The write_disposition specifies the behavior when writing query results
    # to a table that already exists. With WRITE_TRUNCATE, any existing rows
    # in the table are overwritten by the query results.
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        'SELECT 17 AS my_col;',
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location='US',
        job_config=job_config)  # API request - starts the query

    rows = list(query_job)  # Waits for the query to finish
    assert len(rows) == 1
    row = rows[0]
    assert row[0] == row.my_col == 17

    # In addition to using the results from the query, you can read the rows
    # from the destination table directly.
    iterator = client.list_rows(
        table_ref, selected_fields=[bigquery.SchemaField('my_col', 'INT64')])

    rows = list(iterator)
    assert len(rows) == 1
    row = rows[0]
    assert row[0] == row.my_col == 17

Run a query using a named query parameter#

See BigQuery documentation for more information on parameterized queries.

    # from google.cloud import bigquery
    # client = bigquery.Client()

    query = """
        SELECT word, word_count
        FROM `bigquery-public-data.samples.shakespeare`
        WHERE corpus = @corpus
        AND word_count >= @min_word_count
        ORDER BY word_count DESC;
    """
    query_params = [
        bigquery.ScalarQueryParameter('corpus', 'STRING', 'romeoandjuliet'),
        bigquery.ScalarQueryParameter('min_word_count', 'INT64', 250)
    ]
    job_config = bigquery.QueryJobConfig()
    job_config.query_parameters = query_params
    query_job = client.query(
        query,
        # Location must match that of the dataset(s) referenced in the query.
        location='US',
        job_config=job_config)  # API request - starts the query

    # Print the results
    for row in query_job:
        print('{}: \t{}'.format(row.word, row.word_count))

    assert query_job.state == 'DONE'

List jobs for a project#

Jobs describe actions performed on data in BigQuery tables:

  • Load data into a table
  • Run a query against data in one or more tables
  • Extract data from a table
  • Copy a table
    # from google.cloud import bigquery
    # client = bigquery.Client(project='my_project')

    # List the 10 most recent jobs in reverse chronological order.
    # Omit the max_results parameter to list jobs from the past 6 months.
    for job in client.list_jobs(max_results=10):  # API request(s)
        print(job.job_id)

Using BigQuery with Pandas#

As of version 0.29.0, you can use the to_dataframe() function to retrieve query results or table rows as a pandas.DataFrame.

First, ensure that the pandas library is installed by running:

pip install --upgrade pandas

Alternatively, you can install the BigQuery python client library with pandas by running:

pip install --upgrade google-cloud-bigquery[pandas]

To retrieve query results as a pandas.DataFrame:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    sql = """
        SELECT name, SUM(number) as count
        FROM `bigquery-public-data.usa_names.usa_1910_current`
        GROUP BY name
        ORDER BY count DESC
        LIMIT 10
    """

    df = client.query(sql).to_dataframe()

To retrieve table rows as a pandas.DataFrame:

    # from google.cloud import bigquery
    # client = bigquery.Client()

    dataset_ref = client.dataset('samples', project='bigquery-public-data')
    table_ref = dataset_ref.table('shakespeare')
    table = client.get_table(table_ref)

    df = client.list_rows(table).to_dataframe()

Changelog#

For a list of all google-cloud-bigquery releases: