type holyshared = Engineer<mixed>

技術的なことなど色々

BigQueryのテーブルに複数のログファイルをロードする

Google Cloud Storageに保存されている複数のファイルをBigQueryのテーブルに読み込もうとして、エラーが発生するので調査してみました。
エラーはテーブルにレコードが挿入できないエラーで、フォーマットが当初おかしいのかと思っていました。

Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.
Error while reading data, error message: JSON processing encountered too many errors, giving up. Rows: 1; errors: 1; max bad: 0; error percent: 0
Error while reading data, error message: JSON parsing error in row starting at position 0: Could not convert value to string. Field: sign_in_count; Value: 12

しかしファイルを調べてみてもおかしいところはありません。
調べた結果、スキーマの構造が最初に処理するファイルによってスキーマが決定されていそうでした。

例えば、次のようなログファイルがあった場合、sign_int_countがNullableであることはわかりますが、INTEGERというデータ型まで特定するのは困難です。
代わりにデータ型はNullableなSTRING扱いになるようです。

{ "id": "a", "name": "demo1",  sign_in_count: null, "createdAt": "2021-02-17 07:33:05 UTC", "updatedAt": null }

次のようだとおそらくNullableなINTEGERであることは推測できます。
なぜなら、sign_in_countに2が含まれていて、数値であることがわかるからです。

{ "id": "a", "name": "demo1",  sign_in_count: null, "createdAt": "2021-02-17 07:33:05 UTC", "updatedAt": null }
{ "id": "b", "name": "demo2",  sign_in_count: 2, "createdAt": "2021-02-17 07:33:05 UTC", "updatedAt": null }

今までだと単一のファイルで読み込むことが多く、問題が出ませんでした。
それは単一のファイルだとフィールドのデータパターンが全部収集できるので、フィールドのデータ型が推測しやすかったからだと思います。

解決方法

この問題を解決する為に、読み込み時にスキーマを明示的に指定するようにしました。
明示的に指定することで、正しく読み込むことができます。
200ファイル近くのファイル読み込んでみましたが特に問題は起きませんでした。

その代わりautodetectのメリットがなくなります。

import { BigQuery } from '@google-cloud/bigquery'
import { Storage, File } from '@google-cloud/storage'

const storage = new Storage({ projectId: process.env.GCP_PROJECT_ID })
const bigQuery = new BigQuery({ projectId: process.env.GCP_PROJECT_ID })

const logFilesOf = async (logDate: string) => {
  const prefix = `logs/${logDate}/`
  const bucket = storage.bucket(process.env.GCP_BACKUP_BUCKET_NAME!)
  const [files] = await bucket.getFiles({ prefix })
  return files
    .filter((file: File) => file.name !== prefix)
    .filter((file: File) => {
      const size = Number(file.metadata.size)
      return size > 0
    })
}

const loadFilesOf = async (tableId: string, logDate: string) => {
  const files = await logFilesOf(logDate)

  if (files.length <= 0) {
    return
  }

  const table = bigQuery.dataset('example_dataset').table(tableId)

  return table.createLoadJob(files as any, {
    sourceFormat: 'NEWLINE_DELIMITED_JSON',
    autodetect: true,
    writeDisposition: 'WRITE_TRUNCATE',
    schema: {
      fields: [
        {
          name: 'id',
          type: 'STRING',
          mode: 'REQUIRED',
        },
        {
          name: 'name',
          type: 'STRING',
          mode: 'REQUIRED',
        },
        {
          name: 'sign_in_count',
          type: 'INTEGER',
        },
        {
          name: 'createdAt',
          type: 'STRING',
          mode: 'REQUIRED',
        },
        {
          name: 'updatedAt',
          type: 'STRING',
        },
      ],
    },
  })
}

loadFilesOf('example', '2021-08-09')
  .then(() => {
    console.log('done')
  })
  .catch((err: Error) => {
    console.log(err.stack)
  })