#!/usr/bin/env python import pandas as pd import io import numpy as np import sys import os import re import time import logging import argparse as arg from classes.Timer import Timer from classes.CustomFormater import CustomFormatter def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--source', '-s', help='csv source directory', default='csv') parser.add_argument('--export', '-e', help='processeced csv directory', default='exports') parser.add_argument('--towns', help='town raw csv file (inside source follder)', default='commune2021.csv') parser.add_argument('--departments', help='departments raw csv file (inside source follder)', default='departement2021.csv') parser.add_argument('--states', help='states raw csv file (inside source follder)', default='region2021.csv') parser.add_argument('--statistics', help='statistics raw csv file to import', default='statistiques.csv') debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() def import_states_csv(raw_file): """ Process states raw file """ logger.info('import states from {}'.format(raw_file)) reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}' states = pd.read_csv(raw_file, usecols=["REG","NCC","LIBELLE","CHEFLIEU"], converters={'REG': reg_convert}) return states def import_department_csv(raw_file): """ Process department files """ logger.info('import departments from {}'.format(raw_file)) reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}' dep = pd.read_csv(raw_file, usecols=["DEP","NCC","LIBELLE","REG","CHEFLIEU"], converters={'REG':reg_convert}) return dep def import_towns_csv(raw_file): """ Process department files """ logger.info('import town from {}'.format(raw_file)) towns = pd.read_csv(raw_file, usecols=["COM","TYPECOM","NCC","LIBELLE","DEP"]) return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']] def stats_convert(codegeo): if len(codegeo) == 4: return 0 + codegeo if len(codegeo) == 6: return codegeo[1:] return codegeo def import_statistics_csv(raw_file): """ Process stats files """ logger.info('import town from {}'.format(raw_file)) stats = pd.read_csv(raw_file, usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP", "NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318", "DECE0813","DECE9908","DECE9099","DECE8290","P18_LOG","P13_LOG", "P08_LOG","D99_LOG","D90_LOG","D82_LOG", "P18_LOGVAC","P13_LOGVAC", "P08_LOGVAC","D99_LOGVAC","D90_LOGVAC","D82_LOGVAC","P18_RP", "P13_RP","P08_RP","D99_RP","D90_RP","D82_RP", "P18_RSECOCC", "P13_RSECOCC","P08_RSECOCC","D99_RSECOCC","D90_RSECOCC", "D82_RSECOCC"], sep=';', converters={'CODGEO':stats_convert} ) return stats def get_single_date(attr): logger.debug('get a date from {}'.format(attr)) m = re.match('^[D,P]([0-9]{2}).*$', attr) if m.group(1) is None: logger.error('Cand determine single date from {}'.format(attr)) if m.group(1).startswith(tuple(['0','1','2'])): return 2000 + int(m.group(1)), 'null' else: return 1900 + int(m.group(1)), 'null' def get_range_date(attr): logger.debug('get date range from {}'.format(attr)) m = re.match('^[A-Z]*([0-9]{2})([0-9]{2}).*$', attr) date = {} try: for i in [1,2]: logger.debug('Process two digits: {}'.format(m.group(i))) if m.group(i).startswith(tuple(['0','1','2'])): date[i] = 2000 + int(m.group(i)) else: date[i] = 1900 + int(m.group(i)) except Exception as e: logger.error( 'Error when trying to determine daterange from {} - {}'.format( attr, e ) ) return None, None return date[1], date[2] def export_csv(dataframe, path, index = False): logger.debug('export csv from panda dataframe') try: dataframe.to_csv(path ,header = False, index = index) except Exception as e: logger.error( 'Erro when exporting Dataframe to csvfile {}. \n{}'.format( path, e) ) sys.exit(1) if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') t = Timer(logger=logger.info) t.start('Import_CSV') if not os.path.exists(args.source + '/' + args.states): logger.critical('can\'t find source file for states') sys.exit(1) states = import_states_csv(args.source + '/' + args.states) t.stop() logger.debug(states) t.start(name='Import_CSV') if not os.path.exists(args.source + '/' + args.departments): logger.critical('can\'t find source file for departments') sys.exit(1) departments = import_department_csv(args.source + '/' + args.departments) t.stop() logger.debug(departments) t.start('Import_CSV') if not os.path.exists(args.source + '/' + args.towns): logger.critical('can\'t find source file for departments') sys.exit(1) towns = import_towns_csv(args.source + '/' + args.towns) t.stop() logger.debug(towns) t.start('Import_CSV') if not os.path.exists(args.source + '/' + args.statistics): logger.critical('can\'t find source file for statistics') sys.exit(1) statistics = import_statistics_csv(args.source + '/' + args.statistics) statistics = statistics[statistics['CODGEO'].isin(towns['COM'])] t.stop() logger.debug(statistics) t.get_time_by_tag('Import_CSV') # Create missing table : indicators indicators = pd.DataFrame({'indicateur': [ 'population', 'naissances', 'deces', 'logements', 'logements vacants', 'residences principales', 'residences secondaires et logements occasionnels' ], 'code': ['_POP','NAIS', 'DECE','_LOG', '_LOGVAC', '_RP', '_RSECOCC']}, index=[1,2,3,4,5,6,7]) logger.debug(indicators) ## Create departments capitals dep_capitals = departments[['CHEFLIEU','DEP']] dep_capitals.columns = ["CHEFLIEUDEP","DEP"] departments = departments[["DEP","NCC","LIBELLE","REG"]] logger.debug(dep_capitals) ## Create states capitals states_capitals = states[['CHEFLIEU','REG']] states_capitals.columns = ["CHEFLIEUREG","REG"] states = states[[ "REG","NCC","LIBELLE"]] logger.debug(states_capitals) ## create statistics dataframes # # We need to first iterate on statistics t.start('Process_Statistics') c_stats = pd.DataFrame(columns = ['com','id_indicateur','date_debut', 'date_fin','valeur'] ) temp = {"com" : [], "id_indicateur" : [], "date_debut" : [], "date_fin" : [], "valeur" : [] } for s_index,srow in statistics.iterrows(): for index, irow in indicators.iterrows(): if irow['code'].startswith('_'): regex = irow['code'] + '$' else: regex = '^' + irow['code'] logger.debug('Process indicator {}'.format(regex)) selection = srow.filter(regex=regex) for attribute, value in selection.items(): logger.debug('check code: {}'.format(irow['code'])) # If value is NaN, then do not add the line if pd.isna(value): continue if irow['code'].startswith('_'): start,end = get_single_date(attribute) else: start,end = get_range_date(attribute) if start is None or end is None: logger.error('Can\'t process line, continue to next') continue logger.debug( 'town:{}, id_indic: {}, start: {}, end: {}, value:{}' .format( srow['CODGEO'], index, start, end, value ) ) temp['com'].append(srow['CODGEO']) temp['id_indicateur'].append(index) temp['date_debut'].append(start) temp['date_fin'].append(end) temp['valeur'].append(value) consolidated_stats = pd.DataFrame.from_dict(temp) t.stop() indicators=indicators[['indicateur']] t.start('Process_town') towns = pd.merge(towns, statistics[['CODGEO', 'SUPERF']], left_on=['COM'], right_on=['CODGEO'], how = 'left' )[['COM','NCC','LIBELLE', 'DEP', 'SUPERF']] t.stop() logger.debug(towns) # Export all dataframes to csv t.start('Export_CSV') export_csv(towns, args.export + '/commune.csv') export_csv(departments, args.export + '/departement.csv') export_csv(states, args.export + '/region.csv') export_csv(dep_capitals, args.export + '/cheflieudepartement.csv') export_csv(states_capitals, args.export + '/cheflieuregion.csv') export_csv(indicators, args.export + '/indicateur.csv', True) export_csv(consolidated_stats, args.export + '/statistique.csv') t.stop() t.get_times_by_tag() t.get_total_time() sys.exit()