En esta página, se describen en detalle los siguientes atributos de los flujos de cambios:
- Su modelo de partición basado en divisiones
- El formato y el contenido de los registros de flujos de cambios
- La sintaxis de bajo nivel que se usa para consultar esos registros
- Ejemplo del flujo de trabajo de la consulta
La información de esta página es la más relevante para usar la API de Spanner a fin de consultar flujos de cambios directamente. Aplicaciones que, en cambio, usan Dataflow para leer flujos de cambios datos no necesitan trabajar directamente con el modelo de datos que se describe aquí.
Para obtener una guía introductoria más amplia sobre los flujos de cambios, consulta Flujos de cambios descripción general.
Partición de flujos de cambios
Cuando se produce un cambio en una tabla supervisada por un flujo de cambios Spanner escribe el registro de flujos de cambios correspondiente en la base de datos. de forma síncrona en la misma transacción en la que cambian los datos. Esta garantiza que, si la transacción se realiza correctamente, Spanner también capturaron correctamente y conservaron el cambio. Internamente, Spanner ubica el registro de flujos de cambios y el cambio de datos para que el mismo servidor los procese y minimice la sobrecarga de escritura.
Como parte del DML a una división en particular, Spanner agrega la escritura a los datos del flujo de cambios correspondiente. dividir en la misma transacción. Debido a esta colocación, cambiar no agregan coordinación adicional entre los recursos de entrega, minimiza la sobrecarga de confirmación de la transacción.
Spanner escala mediante la división y la combinación dinámica de datos según según la carga y el tamaño de la base de datos, y la distribución de divisiones entre los recursos de entrega.
Para habilitar flujos de cambios, escrituras y lecturas para escalar, las divisiones de Spanner y combina el almacenamiento interno del flujo de cambios con los datos de la base de datos evitar los hotspots automáticamente. Para admitir la lectura de registros de flujos de cambios en casi en tiempo real a medida que se escalan las escrituras de la base de datos, la API de Spanner es diseñada para que un flujo de cambios se consulte de forma simultánea mediante el flujo de cambios y particiones. Las particiones de flujos de cambios se asignan a las divisiones de datos de flujos de cambios que que contienen los registros de flujos de cambios. Si las particiones de un flujo de cambios cambian de forma dinámica con el tiempo y se correlacionan divide y combina dinámicamente los datos de la base de datos.
Una partición de flujo de cambios contiene registros para un rango de claves inmutable para un un intervalo de tiempo específico. Cualquier partición de flujo de cambios se puede dividir en una o más particiones de flujo de cambios, o bien se puede combinar con otras particiones de flujos de cambios. Cuando estos se producen eventos de división o combinación, se crean particiones secundarias para capturar los cambios para sus respectivos rangos de claves inmutables para el próximo intervalo de tiempo. Además, a los registros de cambios de datos, una consulta de flujos de cambios devuelve los registros de particiones secundarios notificar a los lectores sobre las nuevas particiones del flujo de cambios que deben consultarse como registros de señal de monitoreo de funcionamiento para indicar el progreso hacia delante cuando no se han producido operaciones de escritura. recientemente.
Cuando se consulta una partición de flujo de cambios en particular, se que se muestra en el orden de las marcas de tiempo de confirmación. Cada registro de cambios se devuelve una vez. Entre las particiones del flujo de cambios, no hay un orden de cambio garantizado. registros. Los registros de cambios para una clave primaria específica se devuelven solo en una a una partición específica para un intervalo de tiempo determinado.
Debido al linaje de particiones superior-secundario, para procesar cambios en un clave específica en el orden de marca de tiempo de confirmación, registros que muestra el elemento secundario las particiones deben procesarse solo después de que los registros se procesaron correctamente todas las particiones.
Funciones de lectura y sintaxis de consultas de flujos de cambios
GoogleSQL
Puedes consultar flujos de cambios con el
ExecuteStreamingSql
en la API de Cloud. Spanner crea automáticamente una función de lectura especial junto con
con el flujo de cambios. La función de lectura proporciona acceso al cambio
de transmisión. La convención de nomenclatura de la función de lectura es
READ_change_stream_name
Si suponemos que existe un flujo de cambios SingersNameStream
en la base de datos, el
la sintaxis de la consulta para GoogleSQL es la siguiente:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La función de lectura acepta los siguientes argumentos:
Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obligatorio | Especifica que los registros con commit_timestamp mayor o igual que start_timestamp
debería mostrarse. El valor debe estar dentro del flujo de cambios
del período de retención y debe ser menor o igual que la hora actual
y es mayor o igual que la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
TIMESTAMP |
Opcional (predeterminado: NULL ) |
Especifica que se registre con commit_timestamp menos.
mayor o igual que end_timestamp debe
se devuelvan. El valor debe estar dentro del rango de retención del flujo de cambios
del valor y mayor o igual que el start_timestamp . La consulta
finaliza después de mostrar todos los ChangeRecords hasta end_timestamp
o el usuario finalizará la conexión. Si es NULL o no
especificado, la consulta se ejecuta hasta que se devuelvan todos los ChangeRecords o
el usuario cancela la conexión. |
partition_token |
STRING |
Opcional (predeterminado: NULL ) |
Especifica qué partición de flujo de cambios consultar, según el
contenido de particiones secundarias
registros. Si es NULL o no se especifica, esto significa que
lector está consultando el flujo de cambios por primera vez y tiene
no obtuvo tokens de partición específicos para consultar. |
heartbeat_milliseconds |
INT64 |
Obligatorio | Determina la frecuencia con la que se devuelve una señal de monitoreo de funcionamiento.
en caso de que no haya transacciones confirmadas en esta partición.
El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco
minutos). |
read_options |
ARRAY |
Opcional (predeterminado: NULL ) |
Opciones de lectura adicionales reservadas para uso futuro. Por el momento, el único valor permitido es NULL . |
Se recomienda crear un método conveniente para compilar el texto del leer la consulta de función y vincular parámetros a ella, como se muestra a continuación ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Puedes consultar flujos de cambios con el
API de ExecuteStreamingSql
.
Spanner crea automáticamente una función de lectura especial junto con
con el flujo de cambios. La función de lectura proporciona acceso al cambio
de transmisión. La convención de nomenclatura de la función de lectura es
spanner.read_json_change_stream_name
Si suponemos que existe un flujo de cambios SingersNameStream
en la base de datos, el
La sintaxis de la consulta para PostgreSQL es la siguiente:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La función de lectura acepta los siguientes argumentos:
Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obligatorio | Especifica que cambian los registros en los que commit_timestamp es mayor o igual que start_timestamp
debería mostrarse. El valor debe estar dentro del flujo de cambios
del período de retención y debe ser menor o igual que la hora actual
y es mayor o igual que la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
timestamp with timezone |
Opcional (predeterminado: NULL ) |
Especifica que cambia los registros con commit_timestamp
menor o igual que end_timestamp debe
se devuelvan. El valor debe estar dentro del rango de retención del flujo de cambios
del valor y mayor o igual que el start_timestamp .
La consulta finaliza después de mostrar todos los registros de cambios hasta
end_timestamp o el usuario finalizará la conexión.
Si es NULL , la consulta se ejecuta hasta que se muestren todos los registros de cambio
o el usuario finalizará la conexión. |
partition_token |
text |
Opcional (predeterminado: NULL ) |
Especifica qué partición de flujo de cambios consultar, según el
contenido de particiones secundarias
registros. Si es NULL o no se especifica, esto significa que
lector está consultando el flujo de cambios por primera vez y tiene
no obtuvo tokens de partición específicos para consultar. |
heartbeat_milliseconds |
bigint |
Obligatorio | Determina la frecuencia con la que se devolverá un objeto ChangeRecord.
en caso de que no haya transacciones confirmadas en esta partición.
El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco
minutos). |
null |
null |
Obligatorio | Reservado para uso futuro |
Se recomienda crear un método conveniente para compilar el texto del leer y vincular parámetros a ella, como se muestra a continuación ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Cambiar el formato de registro de las transmisiones
GoogleSQL
La función de lectura de flujos de cambios muestra una sola columna ChangeRecord
de tipo
ARRAY<STRUCT<...>>
En cada fila, este array siempre contiene un solo elemento.
Los elementos del array tienen el siguiente tipo:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Este struct tiene tres campos: data_change_record
,
heartbeat_record
y child_partitions_record
, cada uno de los tipos
ARRAY<STRUCT<...>>
En cualquier fila en la que se lea la función de lectura del flujo de cambios
muestra, solo uno de estos tres campos contiene un valor; los otros dos
están vacíos o NULL
. Estos campos de array contienen, como máximo, un elemento.
En las siguientes secciones, se examina cada uno de estos tres tipos de registros.
PostgreSQL
La función de lectura de flujos de cambios muestra una sola columna ChangeRecord
de
escribe JSON
con la siguiente estructura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Hay tres claves posibles en este objeto: data_change_record
,
heartbeat_record
y child_partitions_record
, el valor correspondiente
tipo es JSON
.
En cualquier fila que muestre la función de lectura del flujo de cambios, solo
existe una de estas tres claves.
En las siguientes secciones, se examina cada uno de estos tres tipos de registros.
Registros de cambios de datos
Un registro de cambios de datos contiene un conjunto de cambios en una tabla con la mismo tipo de modificación (insertar, actualizar o eliminar) que se haya confirmado marca de tiempo de confirmación en una partición de flujo de cambios transacción. Se pueden devolver múltiples registros de cambios de datos para el mismo transacción en múltiples particiones de flujos de cambios.
Todos los registros de cambios de datos tienen commit_timestamp
, server_transaction_id
,
y record_sequence
, que juntos determinan el orden del cambio
transmitir para obtener un registro de transmisión. Estos tres campos son suficientes para derivar
el orden de los cambios
y proporcionar coherencia externa.
Ten en cuenta que varias transacciones pueden tener la misma marca de tiempo de confirmación si
cuando tocan datos que no se superponen. El campo server_transaction_id
ofrece la capacidad de distinguir qué conjunto de cambios (potencialmente
entre particiones del flujo de cambios) se emitieron dentro del mismo
transacción. Sincronízalo con record_sequence
y
Los campos number_of_records_in_transaction
te permiten almacenar en búfer y ordenar
todos los registros de una transacción en particular.
Los campos de un registro de cambios de datos incluyen los siguientes:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
commit_timestamp |
TIMESTAMP |
La marca de tiempo en la que se confirmó el cambio. |
record_sequence |
STRING |
Es el número de secuencia del registro dentro de la transacción. Se garantiza que los números de secuencia
ser únicos y crecientes monótonamente (pero no necesariamente contiguos) dentro de una transacción. Ordenar los registros por la misma
server_transaction_id de record_sequence a
a reconstruir el orden de los cambios dentro de la transacción.
Spanner puede optimizar este orden para obtener un mejor rendimiento y es posible que no siempre coincida con el orden original que proporcionan los usuarios. |
server_transaction_id |
STRING |
Una cadena única a nivel global que representa la transacción en en la que se confirmó el cambio. El valor solo debe ser se usan en el contexto del procesamiento de registros de flujos de cambios y no se correlacionado con el ID de transacción en la API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica si este es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valores que se especificó en el la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser |
column_types |
ARRAY<STRUCT< |
El nombre de la columna, el tipo de columna si es una clave primaria, y la posición de la columna como se define en el esquema (“ordinal_position”). La primera columna de una tabla en el esquema tendrían una posición ordinal "1". El tipo de columna pueden estar anidadas en columnas de array. El formato debe coincidir con la estructura de tipos que se describe en la referencia de la API de Spanner. |
mods |
ARRAY<STRUCT< |
Describe los cambios que se realizaron, incluida la clave primaria
anteriores, así como los valores nuevos de las columnas modificadas o a las que se les hace seguimiento.
La disponibilidad y el contenido de los valores antiguos y nuevos dependerán del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas sin clave. |
mod_type |
STRING |
Describe el tipo de cambio. Puede ser INSERT , UPDATE o
DELETE |
number_of_records_in_transaction |
INT64 |
El número de registros de cambios de datos que forman parte de este transacción en todas las particiones del flujo de cambios. |
number_of_partitions_in_transaction |
INT64 |
La cantidad de particiones que devolverán registros de cambios de datos para esta transacción. |
transaction_tag |
STRING |
Etiqueta de transacción asociada a esta transacción. |
is_system_transaction |
BOOL |
Indica si la transacción es una transacción del sistema. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
commit_timestamp |
STRING |
La marca de tiempo en la que se confirmó el cambio. |
record_sequence |
STRING |
Es el número de secuencia del registro dentro de la transacción. Se garantiza que los números de secuencia ser únicos y crecientes monótonamente (pero no necesariamente contiguos) dentro de una transacción. Ordenar los registros por la misma `server_transaction_id` por `record_seq` para reconstruir el orden de los cambios dentro del transacción. |
server_transaction_id |
STRING |
Una cadena única a nivel global que representa la transacción en en la que se confirmó el cambio. El valor solo debe ser usarse en el contexto del procesamiento de registros de flujos de cambios y no se correlacionado con el ID de transacción en la API de Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica si este es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valores que se especificó en el la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
El nombre de la columna, el tipo de columna si es una clave primaria, y la posición de la columna como se define en el esquema (“ordinal_position”). La primera columna de una tabla en el esquema tendrían una posición ordinal "1". El tipo de columna pueden estar anidadas en columnas de array. El formato debe coincidir con la estructura de tipos que se describe en la referencia de la API de Spanner. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Describe los cambios que se realizaron, incluida la clave primaria
anteriores, los valores anteriores y los nuevos de los valores modificados o
columnas. La disponibilidad y el contenido de los valores antiguos y nuevos dependerán de
en el value_capture_type configurado. Los operadores new_values y
Los campos old_values solo contienen las columnas que no son de clave.
|
mod_type |
STRING |
Describe el tipo de cambio. Puede ser INSERT , UPDATE o
DELETE |
number_of_records_in_transaction |
INT64 |
El número de registros de cambios de datos que forman parte de este transacción en todas las particiones del flujo de cambios. |
number_of_partitions_in_transaction |
NUMBER |
La cantidad de particiones que devolverán registros de cambios de datos para esta transacción. |
transaction_tag |
STRING |
Etiqueta de transacción asociada a esta transacción. |
is_system_transaction |
BOOLEAN |
Indica si la transacción es una transacción del sistema. |
A continuación, se muestra un par de ejemplos de registros de cambios de datos. Describen una sola transacción en la que hay un y la transferencia entre dos cuentas. Ten en cuenta que las dos cuentas están en flujo de cambios independiente y particiones.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
El siguiente registro de cambios de datos es un ejemplo de un registro con el valor
tipo de captura "NEW_VALUES"
. Ten en cuenta que solo se propagan los valores nuevos.
Solo se modificó la columna "LastUpdate"
, por lo que solo esa columna
fue devuelto.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambios de datos es un ejemplo de un registro con el valor
tipo de captura "NEW_ROW"
. Solo el "LastUpdate"
se modificó la columna, pero se muestran todas las columnas con seguimiento.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambios de datos es un ejemplo de un registro con el valor
tipo de captura "NEW_ROW_AND_OLD_VALUES"
. Solo el "LastUpdate"
se modificó la columna, pero se muestran todas las columnas con seguimiento. Esta captura de valor
type captura el valor nuevo y el anterior de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registros de latidos
Cuando se devuelve un registro de señal de monitoreo de funcionamiento, este indica que todos los cambios
commit_timestamp
menor o igual que el registro de señal de monitoreo de funcionamiento
Se devolvieron timestamp
y los registros de datos futuros en este
la partición debe tener marcas de tiempo de confirmación más altas que las que devuelve el
registro de monitoreo de funcionamiento. Los registros de señal de monitoreo de funcionamiento se muestran cuando no hay datos
los cambios escritos en una partición. Cuando se escriben cambios en los datos
la partición, se puede usar data_change_record.commit_timestamp
en su lugar
de heartbeat_record.timestamp
para indicar que el lector está presentando
el progreso en la lectura de la partición.
Puedes usar los registros de señal de monitoreo de funcionamiento que se muestran en particiones para sincronizar
lectores de todas las particiones. Una vez que todos los lectores hayan recibido un
señal de monitoreo de funcionamiento superior o igual a alguna marca de tiempo A
, o bien que recibieron datos o elementos secundarios
registros de partición mayores o iguales a la marca de tiempo A
, los lectores sabrán que recibieron
todos los registros confirmados antes o después de esa marca de tiempo A
y pueden comenzar
procesar los registros almacenados en búfer, por ejemplo, ordenar la partición cruzada
registros por marca de tiempo y agrupándolos por server_transaction_id
.
Un registro de señal de monitoreo de funcionamiento contiene solo un campo:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
timestamp |
TIMESTAMP |
La marca de tiempo del registro de la señal de monitoreo de funcionamiento. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
timestamp |
STRING |
La marca de tiempo del registro de la señal de monitoreo de funcionamiento. |
Un ejemplo de registro de señal de monitoreo de funcionamiento, en el que se comunica que todos los registros con marcas de tiempo menores o iguales que la marca de tiempo de este registro.
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registros de particiones secundarias
Un registro de particiones secundarias devuelve información sobre las particiones secundarias: sus tokens de partición, los tokens de sus particiones superiores y los
start_timestamp
, que representa la marca de tiempo más antigua que el elemento secundario
particiones contienen registros de cambios. Registros cuyas marcas de tiempo de confirmación
inmediatamente anteriores a la child_partitions_record.start_timestamp
se
que se devuelven en la partición actual. Después de mostrar todos los
secundarios para esta partición, esta consulta devolverá
un estado de éxito que indica que todos los registros se devolvieron para este
por cada partición.
Los campos de un registro de particiones secundarios incluyen lo siguiente:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
start_timestamp |
TIMESTAMP |
Registros de cambios de datos que muestra el elemento secundario
las particiones en este registro de partición secundario tienen una marca de tiempo de confirmación
mayor o igual que start_timestamp . Cuando se consulta una partición secundaria, la consulta debe
especifica el token de partición secundario y un start_timestamp mayor o igual que
child_partitions_token.start_timestamp Todos los registros de particiones secundarias
que muestra una partición tienen el mismo start_timestamp , y el valor
La marca de tiempo siempre se encuentra entre el start_timestamp especificado de la consulta.
y end_timestamp . |
record_sequence |
STRING |
Una secuencia monótonamente creciente
que se puede usar para definir el orden de la
registro de particiones secundarias cuando hay varios
registros de particiones secundarios devueltos con el mismo start_timestamp en un
una partición específica. El token de partición,
start_timestamp y
record_sequence identifican de forma exclusiva un
el registro de particiones secundarias. |
child_partitions |
ARRAY<STRUCT< |
Muestra un conjunto de particiones secundarias y su información asociada. Esto incluye la cadena del token de partición que se usa para identificar al elemento secundario por cada partición en las consultas, así como los tokens de su y particiones. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
start_timestamp |
STRING |
Registros de cambios de datos que muestra el elemento secundario
las particiones en este registro de particiones secundarias tienen una marca de tiempo de confirmación
mayor o igual que start_timestamp . Cuando se realiza una consulta a un elemento secundario
partición, la consulta debe especificar el token de partición secundaria y un
start_timestamp mayor que o igual que
child_partitions_token.start_timestamp Todas las particiones secundarias
los registros que devuelve una partición tienen el mismo
start_timestamp y la marca de tiempo siempre se encuentra entre
la start_timestamp especificada de la consulta y
end_timestamp
|
record_sequence |
STRING |
Una secuencia monótonamente creciente
que se puede usar para definir el orden de la
registro de particiones secundarias cuando hay varios
registros de particiones secundarios devueltos con el mismo start_timestamp en un
una partición específica. El token de partición,
start_timestamp y
record_sequence identifican de forma exclusiva un
el registro de particiones secundarias. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Muestra un array de particiones secundarias y su información asociada. Esto incluye la cadena del token de partición que se usa para identificar al elemento secundario por cada partición en las consultas, así como los tokens de su y particiones. |
El siguiente es un ejemplo de un registro de partición secundario:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Flujo de trabajo de consultas de transmisiones de cambios
Ejecutar consultas de flujos de cambios con el
API de ExecuteStreamingSql
, con una función de un solo uso
solo lectura
de transacciones y una
un límite de marca de tiempo estricto El cambio
La función de lectura de transmisión te permite especificar los elementos start_timestamp
y
end_timestamp
para el intervalo de tiempo que te interesa. Todos los registros de cambios
dentro del período de retención son accesibles con la estrategia
límite de marca de tiempo.
Todos los demás
TransactionOptions
no son válidos para las consultas sobre el flujo de cambios. Además,
si TransactionOptions.read_only.return_read_timestamp
se configura como verdadero,
se mostrará un valor especial de kint64max - 1
en Transaction
mensaje que describe la transacción, en lugar de una operación
y marca de tiempo. Este valor especial debe descartarse y no debe usarse
consultas posteriores.
Cada consulta de flujos de cambios puede devolver cualquier cantidad de filas, y cada una contiene un registro de cambios de datos, un registro de señales de monitoreo de funcionamiento o particiones registro. No es necesario establecer una fecha límite para la solicitud.
Ejemplo:
El flujo de trabajo de la consulta de transmisión comienza con la emisión de la primera transmisión de cambios
consulta mediante la especificación de partition_token
como NULL
. La consulta debe especificar
la función de lectura para el flujo de cambios, la marca de tiempo de inicio y finalización de interés
el intervalo de latidos. Cuando end_timestamp
es NULL
, la consulta mantiene
y mostrar cambios de datos hasta que finalice la partición.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Procesar registros de datos de esta consulta hasta que los registros de partición secundaria se
que se devuelven. En el siguiente ejemplo, dos registros de particiones secundarios y tres registros
los tokens y, luego, finaliza la consulta. Registros de partición secundaria de un
esta consulta específica siempre comparte el mismo start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para procesar cambios futuros después del 2022-05-01T09:00:01Z
, crea tres nuevos
consultas y ejecutarlas en paralelo. Las tres consultas juntas muestran el valor futuro
cambios de datos para el mismo rango de claves que cubre su elemento superior. Siempre establece
start_timestamp
al start_timestamp
en el mismo registro de partición secundaria y
usa el mismo end_timestamp
y el mismo intervalo de señal de monitoreo de funcionamiento para procesar los registros
de forma coherente en todas las consultas.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Después de un tiempo, la consulta en child_token_2
finaliza después de mostrar otra.
registro de partición secundario, indica que se creará una partición nueva
que cubre cambios futuros tanto para child_token_2
como para child_token_3
a partir de
2022-05-01T09:30:15Z
La consulta devolverá exactamente el mismo registro el
child_token_3
, ya que ambas son las particiones superiores del elemento child_token_4
nuevo
Para garantizar un procesamiento estricto y ordenado de los registros de datos para una clave en particular,
La consulta en child_token_4
solo debe comenzar después de que hayan terminado todos los elementos superiores.
que en este caso son child_token_2
y child_token_3
. Crear solo una consulta
Para cada token de partición secundario, el diseño del flujo de trabajo de consultas debe designar uno
superior que espere y programe la consulta en child_token_4
.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Encontrar ejemplos de manejo y análisis de registros de flujos de cambios en SpannerIO de Apache Beam Conector de Dataflow activado GitHub: