Transferts basés sur des événements

Le service de transfert de stockage peut écouter les notifications d'événements dans AWS ou Google Cloud pour transférer automatiquement les données qui ont été ajoutées ou mises à jour dans l'emplacement source. Les transferts basés sur des événements sont acceptés d'AWS S3 ou de Cloud Storage vers Cloud Storage.

Les transferts basés sur des événements écoutent les notifications d'événement Amazon S3 envoyé à Amazon SQS pour les sources AWS S3. Les sources Cloud Storage envoient des notifications à un abonnement Pub/Sub.

Avantages des transferts basés sur les événements

Comme les transferts basés sur des événements écoutent les modifications apportées au bucket source, sont copiées dans la destination en temps quasi réel. Le service de transfert de stockage n'a pas besoin d'exécuter une opération de listing sur la source, ce qui permet de gagner du temps et de l'argent.

Voici quelques cas d'utilisation :

  • Analyse basée sur les événements: répliquez les données d'AWS vers Cloud Storage pour effectuer des analyses et des traitements.

  • Replication Cloud Storage : activez la réplication automatique et asynchrone des objets entre les buckets Cloud Storage.

    Les transferts basés sur des événements avec le service de transfert de stockage diffèrent réplication Cloud Storage type en créant une copie de vos données dans un autre bucket.

    Cela offre les avantages suivants:

    • Conserver les données de développement et de production dans des espaces de noms distincts.
    • Partager des données sans donner accès au bucket d'origine
    • La sauvegarde sur un autre continent ou dans une zone non couverte par un stockage birégional et multirégional.
  • Configuration DR/HA: répliquez les objets de la source vers la destination de sauvegarde de minutes:

    • Sauvegarde sur plusieurs clouds: créez une copie de sauvegarde AWS S3 sur Cloud Storage.
    • Sauvegarde interrégionale ou inter-projets: créer une copie de Cloud Storage bucket situé dans une région ou un projet différent.
  • Migration à chaud: le transfert basé sur des événements peut permettre une migration avec un temps d'arrêt réduit, l'ordre de minutes des temps d'arrêt, en tant qu'étape de suivi d'un traitement par lot ponctuel la migration.

Configurer des transferts basés sur des événements à partir de Cloud Storage

Les transferts basés sur des événements à partir de Cloud Storage utilisent les notifications Pub/Sub pour savoir quand des objets du bucket source ont été modifiés ou ajouté. Les suppressions d'objets ne sont pas détectées. supprimer un objet à la source ne supprime pas l'objet associé dans le bucket de destination.

Configurer les autorisations

  1. Recherchez le nom de l'agent de service de service de transfert de stockage pour votre projet:

    1. Accédez à la page de référence googleServiceAccounts.get.

      Un panneau interactif s'affiche, intitulé Essayer cette méthode.

    2. Dans le panneau, sous Paramètres de requête, saisissez votre l'ID du projet associé. Le projet que vous spécifiez ici doit être celui que vous pour gérer le service de transfert de stockage, qui peut être différent de la source le projet d'un bucket.

    3. Cliquez sur Exécuter.

    L'adresse e-mail de votre agent de service est renvoyée en tant que valeur de accountEmail. Copiez cette valeur.

    L'adresse e-mail de l'agent de service utilise le format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Attribuez le rôle Pub/Sub Subscriber à l'agent de service de service de transfert de stockage.

    Cloud Console

    Suivez les instructions fournies dans l'article Contrôler les accès via la console Google Cloud pour accorder le rôle Pub/Sub Subscriber au service de transfert de stockage. La peut être attribué au niveau du sujet, de l'abonnement ou du projet.

    CLI gcloud

    Suivez les instructions fournies dans l'article Définir une stratégie sur ajoutez la liaison suivante:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Configurer Pub/Sub

  1. Assurez-vous que les conditions Prérequis d'utilisation Pub/Sub avec Cloud Storage.

  2. Configurez la notification Pub/Sub pour Cloud Storage:

    gcloud storage buckets notifications create gs://BUCKET_NAME --topic=TOPIC_NAME
  3. Créez un abonnement pull pour le sujet:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Créer un job de transfert

Vous pouvez utiliser l'API REST ou la console Google Cloud pour créer un pipeline de transfert.

N'incluez pas d'informations sensibles, telles que des informations permettant d'identifier personnellement l'utilisateur. ou de sécurité dans le nom de votre job de transfert. Les noms de ressources peuvent être propagés vers les noms d'autres ressources Google Cloud et peuvent être exposés aux systèmes internes de Google, en dehors de votre projet.

Cloud Console

  1. Accédez à la page Créer une tâche de transfert dans la console Google Cloud.

    Accédez à Créer une tâche de transfert.

  2. Sélectionnez Cloud Storage comme source et destination.

  3. Pour le mode de planification, sélectionnez En fonction des événements, puis cliquez sur Étape suivante.

  4. Sélectionnez le bucket source pour ce transfert.

  5. Dans la section Flux d'événements, saisissez le nom de l'abonnement:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Vous pouvez éventuellement définir des filtres, puis cliquer sur Étape suivante.

  7. Sélectionnez le bucket de destination pour ce transfert.

  8. Si vous le souhaitez, saisissez les heures de début et de fin du transfert. Si vous ne spécifiez une heure, le transfert commencera immédiatement et s'exécutera jusqu'au arrêtée manuellement.

  9. Spécifiez les options de transfert. Vous trouverez plus d'informations sur le Créer des transferts

  10. Cliquez sur Créer.

Une fois créé, le job de transfert commence à s'exécuter et un écouteur d'événements attend des notifications sur l'abonnement Pub/Sub. La page "Détails du job" affiche une opération par heure et comprend des détails sur les données transférées pour chaque job.

REST

Pour créer un transfert basé sur des événements à l'aide de l'API REST, envoyez le code suivant : JSON sur le point de terminaison transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

Les champs eventStreamStartTime et eventStreamExpirationTime sont facultatifs. Si l'heure de début est omise, le transfert démarre immédiatement. si la fin est omise, le transfert se poursuit jusqu'à ce qu'il soit arrêté manuellement.

Bibliothèques clientes

Go

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez les API du service de transfert de stockage Go documentation de référence.

Pour vous authentifier auprès du service de transfert de stockage, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez les API du service de transfert de stockage Java documentation de référence.

Pour vous authentifier auprès du service de transfert de stockage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez les API du service de transfert de stockage Node.js documentation de référence.

Pour vous authentifier auprès du service de transfert de stockage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez les API du service de transfert de stockage Python documentation de référence.

Pour vous authentifier auprès du service de transfert de stockage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Configurer des transferts basés sur des événements à partir d'AWS S3

Les transferts basés sur des événements à partir d'AWS S3 utilisent les notifications d'Amazon Simple Queue (SQS) pour savoir quand des objets du bucket source ont été modifiés ou ajouté. Les suppressions d'objets ne sont pas détectées. supprimer un objet à la source ne supprime pas l'objet associé dans le bucket de destination.

Créer une file d'attente SQS

  1. Dans la console AWS, accédez à la page Service de file d'attente simple.

  2. Cliquez sur Créer une file d'attente.

  3. Saisissez un nom pour cette file d'attente.

  4. Dans la section Règle d'accès, sélectionnez Avancé. Un objet JSON est affiché:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Les valeurs de AWS et Resource sont uniques pour chaque projet.

  5. Copiez vos valeurs spécifiques de AWS et Resource à partir du fichier JSON affiché dans l'extrait de code JSON suivant :

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    Les valeurs des espaces réservés dans le fichier JSON précédent utilisent les éléments suivants : format:

    • AWS est une valeur numérique représentant votre Amazon Web Services. projet. Exemple :"aws:SourceAccount": "1234567890"
    • RESOURCE est un numéro de ressource Amazon (ARN) qui identifie cette file d'attente. Exemple : "Resource": "arn:aws:sqs:us-west-2:01234567890:test".
    • S3_BUCKET_ARN est un ARN qui identifie le bucket source. Exemple : "aws:SourceArn": "arn:aws:s3:::example-aws-bucket". Vous pouvez recherchez l'ARN d'un bucket dans l'onglet Propriétés de la page d'informations du bucket. dans la console AWS.
  6. Remplacez le fichier JSON affiché dans la section Access policy (Règle d'accès) par la valeur mise à jour. JSON ci-dessus.

  7. Cliquez sur Créer une file d'attente.

Une fois l'opération terminée, notez le nom de ressource Amazon (ARN) de la file d'attente. L'ARN comporte les le format suivant:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Activer les notifications sur votre bucket S3

  1. Dans la console AWS, accédez à la page S3.

  2. Dans la liste Buckets, sélectionnez votre bucket source.

  3. Sélectionnez l'onglet Propriétés.

  4. Dans la section Notifications relatives à un événement, cliquez sur Créer une notification d'événement.

  5. Attribuez un nom à cet événement.

  6. Dans la section Types d'événements, sélectionnez Tous les événements de création d'objets.

  7. Pour Destination, sélectionnez File d'attente SQS, puis la file d'attente que vous avez créée pour ce transfert.

  8. Cliquez sur Enregistrer les modifications.

Configurer les autorisations

Suivez les instructions de la section Configurer l'accès à une source : Amazon S3 pour créer un ID de clé d'accès et une clé secrète, ou un rôle d'identité fédérée.

Remplacez le fichier JSON des autorisations personnalisées par le code suivant:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::AWS_BUCKET_NAME",
                "arn:aws:s3:::AWS_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Une fois la création terminée, tenez compte des informations suivantes:

  • Pour un utilisateur, notez l'ID de clé d'accès et la clé secrète.
  • Pour un rôle d'identité fédérée, notez le nom ARN (Amazon Resource Name), dont le format est arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME

Créer un job de transfert

Vous pouvez utiliser l'API REST ou la console Google Cloud pour créer un pipeline de transfert.

Cloud Console

  1. Accédez à la page Créer un job de transfert dans la console Google Cloud.

    Accédez à Créer un job de transfert.

  2. Sélectionnez Amazon S3 comme type de source et Cloud Storage comme destination.

  3. Pour le mode de planification, sélectionnez En fonction des événements, puis cliquez sur Étape suivante.

  4. Saisissez le nom de votre bucket S3. Le nom du bucket est celui qui apparaît dans AWS Management Console. Exemple : my-aws-bucket.

  5. Sélectionnez votre méthode d'authentification et saisissez les informations demandées. que vous avez créée et notée dans la section précédente.

  6. Saisissez l'ARN de la file d'attente Amazon SQS que vous avez créée précédemment. Elle utilise le le format suivant:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Si vous le souhaitez, définissez des filtres, puis cliquez sur Étape suivante.

  8. Sélectionnez le bucket Cloud Storage de destination et, éventuellement, le chemin d'accès.

  9. Si vous le souhaitez, saisissez les heures de début et de fin du transfert. Si vous ne spécifiez une heure, le transfert commencera immédiatement et s'exécutera jusqu'au manuellement.

  10. Spécifiez les options de transfert. Vous trouverez plus d'informations sur le Créer des transferts

  11. Cliquez sur Créer.

Une fois créé, le job de transfert commence à s'exécuter et un écouteur d'événements attend dans la file d'attente SQS. La page "Détails de la tâche" toutes les heures, et comprend les détails des données transférées pour chaque tâche.

REST

Pour créer un transfert basé sur des événements à l'aide de l'API REST, envoyez le code suivant : JSON sur le point de terminaison transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

Les champs eventStreamStartTime et eventStreamExpirationTime sont facultatifs. Si l'heure de début est omise, le transfert démarre immédiatement. si la fin est omise, le transfert se poursuit jusqu'à ce qu'il soit arrêté manuellement.

Bibliothèques clientes

Go

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez les API du service de transfert de stockage Go documentation de référence.

Pour vous authentifier auprès du service de transfert de stockage, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Java du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez les API du service de transfert de stockage Node.js documentation de référence.

Pour vous authentifier auprès du service de transfert de stockage, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez les API du service de transfert de stockage Python documentation de référence.

Pour vous authentifier auprès du service de transfert de stockage, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")