Using to_csv after preforming some ETL to a Google Cloud Bucket

  airflow, google-cloud-storage, python

I was wondering if anyone can help. I’m trying to take a CSV from a GCP bucket, run it into a dataframe, and then output the file to another bucket in the project, however using this method my dag is running but i dont im not getting any outputs into my designated bucket? My dag just takes ages to run. Any insight on this issue?

import gcsfs
from airflow.operators import python_operator
from airflow import models
import pandas as pd
import logging
import csv
import datetime


fs = gcsfs.GCSFileSystem(project='project-goes-here')
with fs.open('gs://path/file.csv') as f:
    gas_data = pd.read_csv(f)


def make_csv():
    # Creates the CSV file with a datetime with no index, and adds the map, collection and collection address to the CSV
    # Calisto changed their mind on the position of where the conversion factor and multiplication factor should go
    gas_data['Asset collection'] = 'Distribution'
    gas_data['Asset collection address 1'] = 'Distribution'
    gas_data['Asset collection address 2'] = 'Units1+2 Central City'
    gas_data['Asset collection address 3'] = 'ind Est'
    gas_data['Asset collection city'] = 'Coventry'
    gas_data['Asset collection postcode'] = 'CV6 5RY'
    gas_data['Multiplication Factor'] = '1.000'
    gas_data['Conversion Factor'] = '1.022640'
    gas_data.to_csv('gs://path/'
                'Clean_zenos_data_' + datetime.datetime.today().strftime('%m%d%Y%H%M%S''.csv'), index=False,
                quotechar='"', sep=',', quoting=csv.QUOTE_NONNUMERIC)
                logging.info('Added Map, Asset collection, Asset collection address and Saved CSV')

    make_csv_function = python_operator.PythonOperator(
    task_id='make_csv',
    python_callable=make_csv
)

Source: Python Questions

LEAVE A COMMENT