In questa pagina vengono descritti in dettaglio i seguenti attributi delle modifiche in tempo reale:
- Il suo modello di partizionamento basato su suddivisione
- Il formato e il contenuto dei record delle modifiche in tempo reale
- La sintassi di basso livello utilizzata per eseguire query su questi record
- Un esempio del flusso di lavoro delle query
Le informazioni su questa pagina sono più pertinenti per l'utilizzo dell'API Spanner per eseguire direttamente query sulle modifiche in tempo reale. Applicazioni che invece utilizzano Dataflow per leggere le modifiche in tempo reale dati non devono lavorare direttamente con il modello dei dati. descritti qui.
Per una guida introduttiva più ampia ai modifiche in tempo reale, consulta Stream di modifiche Panoramica.
Partizioni delle modifiche in tempo reale
Quando si verifica una modifica in una tabella monitorata da uno stream di modifiche, Spanner scrive un record dello stream di modifiche corrispondente nel database, in modo sincrono nella stessa transazione della modifica dei dati. Questo garantisce che, se la transazione ha esito positivo, Spanner ha anche ha acquisito correttamente e ha mantenuto la modifica. Internamente, Spanner co-posiziona il record di modifiche in tempo reale e la modifica dei dati in modo che vengano elaborati dallo stesso server per ridurre al minimo l'overhead di scrittura.
Come parte del DML per una particolare suddivisione, accoda la scrittura ai dati corrispondenti del flusso di modifiche suddivisi nella stessa transazione. A causa di questa colocation, cambia I flussi di dati non aggiungono ulteriore coordinazione tra le risorse di distribuzione, il che riduce al minimo l'overhead del commit della transazione.
Spanner offre scalabilità dividendo e unendo dinamicamente i dati in base sul carico e sulle dimensioni del database e sulla distribuzione delle suddivisioni tra le risorse di servizio.
A abilita le scritture e le letture dei modifiche in tempo reale per la scalabilità, Spanner e unisce la memoria interna delle modifiche in tempo reale ai dati del database, evitando automaticamente gli hotspot. Per supportare la lettura dei record di modifiche in tempo reale quasi in tempo reale quando il database scrive la scalabilità, l'API Spanner progettato per eseguire query su un flusso di modifiche contemporaneamente utilizzando partizioni di Compute Engine. Le partizioni delle modifiche in tempo reale vengono mappate alle suddivisioni dei dati delle modifiche in tempo reale che contengono i record di modifiche in tempo reale. Una modifica alle partizioni di una modifica in tempo reale in modo dinamico nel tempo e sono correlati al modo in cui Spanner divide e unisce dinamicamente i dati del database.
Una partizione di modifiche in tempo reale contiene i record per un intervallo di chiavi immutabile per un in un intervallo di tempo specifico. Qualsiasi partizione di modifiche in tempo reale può essere suddivisa in una o più partizioni di modifiche in tempo reale oppure unite ad altre partizioni di flusso di modifiche. Quando questi che si verificano eventi di suddivisione o unione, le partizioni secondarie vengono create per acquisire le modifiche per i rispettivi intervalli di chiavi immutabili per l'intervallo di tempo successivo. Inoltre, ai record delle modifiche ai dati, una query di flusso di modifiche restituisce i record di partizione figlio notifica ai lettori le nuove partizioni delle modifiche in tempo reale su cui è necessario eseguire query; come record di heartbeat per indicare l'avanzamento in avanti quando non sono state eseguite scritture di recente.
Quando esegui una query su una determinata partizione dello stream di modifiche, i record di modifica vengono restituito in ordine di timestamp del commit. Ogni record di modifica viene restituito esattamente una volta sola. Tra le partizioni di modifiche in tempo reale, non è garantito l'ordine delle modifiche record. I record delle modifiche per una determinata chiave primaria vengono restituiti solo in una per un determinato intervallo di tempo.
A causa della struttura delle partizioni principali e secondarie, per elaborare le modifiche per una determinata chiave in ordine di timestamp del commit, i record restituiti dalle partizioni secondarie devono essere elaborati solo dopo aver elaborato i record di tutte le partizioni principali.
Funzioni di lettura delle modifiche in tempo reale e sintassi delle query
GoogleSQL
Esegui query sui flussi di variazioni utilizzando l'API
ExecuteStreamingSql
. Spanner crea automaticamente una funzione di lettura speciale
con le modifiche in tempo reale. La funzione di lettura fornisce accesso alla modifica
i record dello stream. La convenzione di denominazione della funzione di lettura è
READ_change_stream_name
.
Presumendo che nel database esista un flusso di modifiche SingersNameStream
,
la sintassi delle query per GoogleSQL è la seguente:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La funzione di lettura accetta i seguenti argomenti:
Nome argomento | Tipo | Obbligatorio? | Descrizione |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obbligatorio | Specifica che i record con commit_timestamp maggiore o uguale a start_timestamp
da restituire. Il valore deve rientrare nel periodo di conservazione del stream di modifiche, deve essere minore o uguale all'ora corrente e maggiore o uguale al timestamp della creazione dello stream di modifiche. |
end_timestamp |
TIMESTAMP |
Facoltativo (valore predefinito: NULL ) |
Specifica che i record con commit_timestamp in meno
o uguale a end_timestamp deve
da restituire. Il valore deve rientrare nella conservazione delle modifiche in tempo reale
e maggiore o uguale al start_timestamp . La query
termina dopo aver restituito tutti i ChangeRecord fino a end_timestamp
o l'utente interrompe la connessione. Se NULL o meno
specificato, la query viene eseguita finché non vengono restituiti tutti i ChangeRecord o
dall'utente termina la connessione. |
partition_token |
STRING |
Facoltativo (valore predefinito: NULL ) |
Specifica la partizione di modifiche in tempo reale su cui eseguire la query, in base alla
contenuto delle partizioni figlio
record. Se NULL o non specificato, significa che
Reader sta eseguendo una query sul flusso di modifiche per la prima volta e ha
non hai ottenuto token di partizione specifici da cui eseguire la query. |
heartbeat_milliseconds |
INT64 |
Obbligatorio | Determina la frequenza con cui viene restituito un ChangeRecord di heartbeat
nel caso in cui non siano presenti transazioni
confermate in questa partizione.
Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque
minuti). |
read_options |
ARRAY |
Facoltativo (valore predefinito: NULL ) |
Opzioni di lettura aggiuntive riservate per l'uso futuro. Attualmente, l'unico valore consentito è NULL . |
Consigliamo di utilizzare un metodo pratico per creare il testo del tag di lettura della query di funzione e dei parametri di associazione, come mostrato di seguito esempio.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Per eseguire query sulle modifiche in tempo reale, puoi utilizzare
API ExecuteStreamingSql
.
Spanner crea automaticamente una funzione di lettura speciale
con le modifiche in tempo reale. La funzione di lettura fornisce accesso alla modifica
i record dello stream. La convenzione di denominazione delle funzioni di lettura
spanner.read_json_change_stream_name
.
Presumendo che nel database esista un flusso di modifiche SingersNameStream
,
la sintassi delle query per PostgreSQL è la seguente:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La funzione di lettura accetta i seguenti argomenti:
Nome argomento | Tipo | Obbligatorio? | Descrizione |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obbligatorio | Specifica che i record di modifica con commit_timestamp maggiore o uguale a start_timestamp
da restituire. Il valore deve essere compreso nel flusso di modifiche
di periodo di conservazione e deve essere inferiore o uguale all'ora corrente,
e maggiore o uguale al timestamp della creazione della modifica in tempo reale. |
end_timestamp |
timestamp with timezone |
Facoltativo (valore predefinito: NULL ) |
Specifica che i record delle modifiche con commit_timestamp
minore o uguale a end_timestamp deve
da restituire. Il valore deve rientrare nella conservazione delle modifiche in tempo reale
e maggiore o uguale al start_timestamp .
La query termina dopo aver restituito tutti i record di modifica fino a
end_timestamp o dopo che l'utente ha terminato la connessione.
Se NULL , la query viene eseguita finché non vengono restituiti tutti i record di modifica o l'utente non termina la connessione. |
partition_token |
text |
Facoltativo (valore predefinito: NULL ) |
Specifica la partizione di modifiche in tempo reale su cui eseguire la query, in base alla
contenuto delle partizioni figlio
record. Se è NULL o non è specificato, significa che il
lettore sta eseguendo una query sul flusso di modifiche per la prima volta e non ha ottenuto alcun token di partizione specifico da cui eseguire query. |
heartbeat_milliseconds |
bigint |
Obbligatorio | Determina la frequenza con cui verrà restituito un ChangeRecord heartbeat nel caso in cui non siano presenti transazioni committate in questa partizione.
Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque
minuti). |
null |
null |
Obbligatorio | Riservato per un uso futuro |
Consigliamo di utilizzare un metodo pratico per creare il testo del tag la funzione di lettura e i parametri di associazione, come mostrato di seguito esempio.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Formato di record delle modifiche in tempo reale
GoogleSQL
La funzione di lettura degli stream di variazioni restituisce una singola colonna ChangeRecord
di tipo
ARRAY<STRUCT<...>>
. In ogni riga, questo array contiene sempre un singolo elemento.
Gli elementi dell'array hanno il seguente tipo:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Lo struct contiene tre campi: data_change_record
,
heartbeat_record
e child_partitions_record
, ciascuno del tipo
ARRAY<STRUCT<...>>
. In qualsiasi riga restituita dalla funzione di lettura dello stream di modifiche, solo uno di questi tre campi contiene un valore; gli altri due sono vuoti o NULL
. Questi campi di array contengono al massimo un elemento.
Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.
PostgreSQL
La funzione di lettura degli stream di variazioni restituisce una singola colonna ChangeRecord
di tipo JSON
con la seguente struttura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Esistono tre possibili chiavi in questo oggetto: data_change_record
,
heartbeat_record
e child_partitions_record
, il valore corrispondente
il tipo è JSON
.
In ogni riga restituita dalla funzione di lettura delle modifiche in tempo reale, solo
esiste una di queste tre chiavi.
Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.
Record delle modifiche dei dati
Un record delle modifiche ai dati contiene un insieme di modifiche a una tabella stesso tipo di modifica (inserimento, aggiornamento o eliminazione) eseguito con lo stesso di commit in una partizione di modifiche in tempo reale per lo stesso transazione. È possibile restituire più record di modifiche dei dati per lo stesso su più partizioni delle modifiche in tempo reale.
Tutti i record delle modifiche ai dati hanno commit_timestamp
, server_transaction_id
,
e record_sequence
, che insieme determinano l'ordine della modifica
"streaming" per un record di stream. Questi tre campi sono sufficienti per ricavare
l'ordine delle modifiche e fornire coerenza esterna.
Tieni presente che più transazioni possono avere lo stesso timestamp di commit se
toccano dati non sovrapposti. Il campo server_transaction_id
consente di distinguere gli insiemi di modifiche
tra le partizioni di modifiche in tempo reale) sono stati emessi all'interno dello stesso
transazione. Accoppiandolo con record_sequence
e
I campi number_of_records_in_transaction
consentono di eseguire il buffering e l'ordinamento
tutti i record di una determinata transazione.
I campi di un record di modifica dei dati includono quanto segue:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
commit_timestamp |
TIMESTAMP |
Timestamp in cui è stato eseguito il commit della modifica. |
record_sequence |
STRING |
Il numero di sequenza per il record all'interno della transazione. I numeri di sequenza sono garantiti
unici e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso
server_transaction_id in base a record_sequence per
ricostruire l'ordine delle modifiche all'interno della transazione.
Questo ordinamento potrebbe essere ottimizzato da Spanner per prestazioni migliori e potrebbe non corrispondere sempre a quello originale fornito dagli utenti. |
server_transaction_id |
STRING |
Una stringa univoca a livello globale che rappresenta la transazione in di cui è stato eseguito il commit della modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record dello stream di modifiche e non è correlato all'ID transazione nell'API di Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica se questo è l'ultimo record per una transazione nella partizione corrente. |
table_name |
STRING |
Nome della tabella interessata dalla modifica. |
value_capture_type |
STRING |
Descrive il tipo di acquisizione del valore specificato nel configurazione delle modifiche in tempo reale al momento dell'acquisizione di questa modifica. Il tipo di acquisizione del valore può essere |
column_types |
ARRAY<STRUCT< |
Il nome della colonna, il tipo di colonna se si tratta di una chiave primaria e la posizione della colonna definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrebbe la posizione ordinale "1". Il tipo di colonna possono essere nidificati per le colonne di array. Il formato corrisponde alla struttura del tipo descritto nel riferimento per l'API Spanner. |
mods |
ARRAY<STRUCT< |
Descrive le modifiche apportate, inclusa la chiave primaria
valori, i vecchi valori e i nuovi valori delle colonne modificate o monitorate.
La disponibilità e il contenuto dei valori vecchi e nuovi dipenderanno dal valore configurato value_capture_type. I campi new_values e old_values contengono solo le colonne non chiave. |
mod_type |
STRING |
Descrive il tipo di modifica. Uno dei valori INSERT , UPDATE o
DELETE . |
number_of_records_in_transaction |
INT64 |
Il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni del flusso di modifiche. |
number_of_partitions_in_transaction |
INT64 |
Il numero di partizioni che restituirà i record delle modifiche dei dati per questa transazione. |
transaction_tag |
STRING |
Tag transazione associato a questa transazione. |
is_system_transaction |
BOOL |
Indica se la transazione è di sistema. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
commit_timestamp |
STRING |
Il timestamp in cui è stato eseguito il commit della modifica. |
record_sequence |
STRING |
Il numero di sequenza per il record all'interno della transazione. I numeri di sequenza sono garantiti unici e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso 'attributo "server_transaction_id" in base a "record_sequence" per ricostruire l'ordine delle modifiche all'interno della transaction. |
server_transaction_id |
STRING |
Una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record dello stream di modifiche e non è correlato all'ID transazione nell'API di Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica se questo è l'ultimo record per una transazione nella partizione corrente. |
table_name |
STRING |
Nome della tabella interessata dalla modifica. |
value_capture_type |
STRING |
Descrive il tipo di acquisizione del valore specificato nel configurazione delle modifiche in tempo reale al momento dell'acquisizione di questa modifica. Il tipo di acquisizione del valore può essere |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
Il nome della colonna, il tipo di colonna se si tratta di una chiave primaria e la posizione della colonna definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrebbe la posizione ordinale "1". Il tipo di colonna possono essere nidificati per le colonne di array. Il formato corrisponde alla struttura del tipo descritto nel riferimento per l'API Spanner. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Descrive le modifiche apportate, inclusa la chiave primaria
valori, i vecchi valori e i nuovi valori dei valori
colonne. La disponibilità e il contenuto dei valori vecchi e nuovi dipenderanno
sul tipo configurato value_capture_type. I campi new_values e
old_values contengono solo le colonne non chiave.
|
mod_type |
STRING |
Descrive il tipo di modifica. Uno dei valori INSERT , UPDATE o
DELETE . |
number_of_records_in_transaction |
INT64 |
Il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni del flusso di modifiche. |
number_of_partitions_in_transaction |
NUMBER |
Il numero di partizioni che restituiranno i record di variazione dei dati per questa transazione. |
transaction_tag |
STRING |
Tag transazione associato a questa transazione. |
is_system_transaction |
BOOLEAN |
Indica se la transazione è di sistema. |
Segue un paio di record di modifiche dei dati di esempio. Descrivono una singola transazione in cui è presente e trasferire dati tra due account. Tieni presente che i due account si trovano in partizioni di stream di modifiche separate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore "NEW_VALUES"
. Tieni presente che vengono compilati solo i nuovi valori.
È stata modificata solo la colonna "LastUpdate"
, quindi solo quella colonna
è stato restituito.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Il seguente record di modifiche dei dati è un esempio di record con il valore
tipo di acquisizione "NEW_ROW"
. È stata modificata solo la colonna "LastUpdate"
, ma vengono restituite tutte le colonne monitorate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore "NEW_ROW_AND_OLD_VALUES"
. Solo "LastUpdate"
è stata modificata, ma vengono restituite tutte le colonne monitorate. Questa acquisizione del valore
acquisisce il nuovo valore e quello precedente di LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Record di heartbeat
Quando viene restituito un record di heartbeat, questo indica che tutte le modifiche
commit_timestamp
inferiore o uguale al record di heartbeat
Sono stati restituiti timestamp
e record di dati futuri in questo
deve avere timestamp di commit più alti di quelli restituiti
un record di heartbeat. I record di heartbeat vengono restituiti quando non sono disponibili dati
modifiche scritte su una partizione. Quando sono presenti modifiche ai dati
la partizione, puoi usare data_change_record.commit_timestamp
di heartbeat_record.timestamp
per indicare che il lettore sta progredendo
l'avanzamento della lettura della partizione.
Puoi usare record di heartbeat restituiti sulle partizioni per eseguire la sincronizzazione
lettori in tutte le partizioni. Una volta che tutti i lettori hanno ricevuto un
un heartbeat maggiore o uguale a un timestamp A
oppure ha ricevuto dati o elementi figlio
di partizione maggiore o uguale al timestamp A
, i lettori sanno di aver ricevuto
tutti i record impegnati a partire da quel timestamp A
e possono iniziare
elaborare i record presenti nel buffer, ad esempio ordinare la partizione
record per timestamp e raggrupparli per server_transaction_id
.
Un record di heartbeat contiene un solo campo:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
timestamp |
TIMESTAMP |
Timestamp del record di heartbeat. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
timestamp |
STRING |
Il timestamp del record heartbeat. |
Un record di heartbeat di esempio, in cui tutti i record con timestamp è stato restituito un valore minore o uguale al timestamp di questo record:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Record di partizioni secondarie
Un record di partizioni figlio restituisce informazioni sulle partizioni figlio: i token di partizione, i token delle partizioni principali e
start_timestamp
che rappresenta il primo timestamp che l'account secondario
contengono record di modifiche. Record con timestamp di commit
sono immediatamente precedenti a child_partitions_record.start_timestamp
sono
restituito nella partizione corrente. Dopo aver restituito tutti i
partizioni figlio per questa partizione, questa query restituirà con
Uno stato di operazione riuscita, che indica che tutti i record sono stati restituiti per questo
della partizione di testo.
I campi di un record delle partizioni secondarie includono quanto segue:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
start_timestamp |
TIMESTAMP |
I record di modifica dei dati restituiti dalle partizioni secondarie in questo record della partizione secondaria hanno un timestamp di commit maggiore o uguale a start_timestamp . Quando esegui una query su una partizione figlio,
specifica il token di partizione figlio e un valore start_timestamp maggiore o uguale a
child_partitions_token.start_timestamp . Tutti i record delle partizioni secondarie
restituiti da una partizione hanno lo stesso start_timestamp e il
timestamp rientra sempre tra start_timestamp
e end_timestamp specificati nella query. |
record_sequence |
STRING |
Una sequenza monotonica crescente
numero che può essere utilizzato per definire l'ordine dei
di partizioni figlio, quando sono presenti
record delle partizioni secondarie restituiti con lo stesso start_timestamp in un
particolare partizione. Il token di partizione,
start_timestamp e
record_sequence identifica in modo univoco un
partizioni figlio. |
child_partitions |
ARRAY<STRUCT< |
Restituisce un insieme di partizioni figlio e le informazioni associate. Include la stringa del token di partizione utilizzata per identificare l'elemento figlio partizionata nelle query, nonché i token del partizioni di Compute Engine. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
start_timestamp |
STRING |
Record delle modifiche dei dati restituiti da un account bambino
le partizioni in questo record di partizioni figlio hanno un timestamp di commit
maggiore o uguale a start_timestamp . Quando invii una query a un account bambino
la partizione, la query deve specificare il token di partizione figlio e
start_timestamp maggiore o uguale a
child_partitions_token.start_timestamp . Tutte le partizioni figlio
i record restituiti da una partizione hanno lo stesso
start_timestamp e il timestamp è sempre compreso tra
gli attributi start_timestamp e la durata della query
end_timestamp .
|
record_sequence |
STRING |
Una sequenza monotonica crescente
numero che può essere utilizzato per definire l'ordine dei
di partizioni figlio, quando sono presenti
record delle partizioni secondarie restituiti con lo stesso start_timestamp in un
particolare partizione. Il token di partizione,
start_timestamp e
record_sequence identifica in modo univoco un
partizioni figlio. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Restituisce un array di partizioni figlio e le informazioni associate. Include la stringa del token di partizione utilizzata per identificare l'elemento figlio partizionata nelle query, nonché i token del partizioni di Compute Engine. |
Di seguito è riportato un esempio di record di partizione figlio:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Flusso di lavoro delle query sulle modifiche in tempo reale
Esegui le query di modifiche in tempo reale utilizzando
ExecuteStreamingSql
, con una singola API
sola lettura
transazione e un
legato al timestamp elevato. Il cambiamento
la funzione di lettura del flusso consente di specificare start_timestamp
e
end_timestamp
per l'intervallo di tempo che ti interessa. Tutti i record delle modifiche
entro il periodo di conservazione sono accessibili tramite l'efficace
al timestamp associato.
Tutti gli altri
TransactionOptions
non sono validi per le query di modifiche in tempo reale. Inoltre,
Se il criterio TransactionOptions.read_only.return_read_timestamp
è impostato su true,
verrà restituito un valore speciale di kint64max - 1
in Transaction
che descrive la transazione, invece di una lettura valida
timestamp. Questo valore speciale deve essere ignorato e non utilizzato per
per le query successive.
Ogni query delle modifiche in tempo reale può restituire un numero qualsiasi di righe, ognuna contenente un record di modifiche dei dati, un record heartbeat o partizioni figlio record. Non è necessario impostare una scadenza per la richiesta.
Esempio:
Il flusso di lavoro delle query in streaming inizia con l'emissione della prima query sul flusso di modifiche specificando partition_token
e NULL
. La query deve specificare
la funzione di lettura per il flusso di modifiche, il timestamp di inizio e di fine di interesse e
l'intervallo del battito cardiaco. Quando il valore di end_timestamp
è NULL
, la query mantiene
restituendo modifiche ai dati fino al termine della partizione.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Elabora i record di dati di questa query finché i record di partizione figlio non vengono
restituito. Nell'esempio seguente, due record di partizione figlio e tre partizioni
vengono restituiti i token, quindi la query viene terminata. Record di partizione figlio da un
una query specifica condivide sempre lo stesso start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Per elaborare le modifiche future dopo il giorno 2022-05-01T09:00:01Z
, crea tre nuove query ed eseguile in parallelo. Insieme, le tre query restituiscono informazioni
modifiche ai dati per lo stesso intervallo di chiavi coperto dall'elemento principale. Imposta sempre start_timestamp
su start_timestamp
nello stesso record della partizione secondaria e utilizza lo stesso end_timestamp
e lo stesso intervallo di heartbeat per elaborare i record in modo coerente in tutte le query.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Dopo un po' di tempo, la query su child_token_2
termina dopo aver restituito un'altra query
di partizione figlio, questo record indica che verrà
copre le modifiche future per child_token_2
e child_token_3
a partire da
2022-05-01T09:30:15Z
. La query restituirà lo stesso identico record
child_token_3
, perché entrambe sono le partizioni padre del nuovo child_token_4
.
Per garantire un'elaborazione ordinata dei record di dati per una determinata chiave,
la query su child_token_4
deve essere avviata solo al termine di tutte le query principali,
che in questo caso sono child_token_2
e child_token_3
. Crea una sola query per ogni token della partizione secondaria. Il design del flusso di lavoro delle query deve designare un amministratore per attendere e pianificare la query su child_token_4
.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Puoi trovare esempi di gestione e analisi dei record dei flussi di variazioni nel connettore Apache Beam SpannerIO Dataflow su GitHub.