#!/usr/bin/env python import pandas as pd import io import numpy as np import sys import os import re import time import logging import argparse as arg from timer.Timer import Timer class CustomFormatter(logging.Formatter): grey = "\x1b[0;35m" blue = "\x1b[34;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" format = "%(levelname)s: %(message)s (%(filename)s:%(lineno)d)" FORMATS = { logging.DEBUG: blue + format + reset, logging.INFO: grey + format + reset, logging.WARNING: yellow + format + reset, logging.ERROR: red + format + reset, logging.CRITICAL: bold_red + format + reset } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--source', '-s', help='csv source directory', default='csv') parser.add_argument('--export', '-e', help='processeced csv directory', default='exports') parser.add_argument('--towns', help='town raw csv file (inside source follder)', default='commune2021.csv') parser.add_argument('--departments', help='departments raw csv file (inside source follder)', default='departement2021.csv') parser.add_argument('--states', help='states raw csv file (inside source follder)', default='region2021.csv') parser.add_argument('--statistics', help='statistics raw csv file to import', default='statistiques.csv') debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() def import_states_csv(raw_file): """ Process states raw file """ logger.info('import states from {}'.format(raw_file)) reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}' states = pd.read_csv(raw_file, usecols=["REG","NCC","LIBELLE","CHEFLIEU"], converters={'REG': reg_convert}) return states def import_department_csv(raw_file): """ Process department files """ logger.info('import departments from {}'.format(raw_file)) reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}' dep = pd.read_csv(raw_file, usecols=["DEP","NCC","LIBELLE","REG","CHEFLIEU"], converters={'REG':reg_convert}) return dep def import_towns_csv(raw_file): """ Process department files """ logger.info('import town from {}'.format(raw_file)) towns = pd.read_csv(raw_file, usecols=["COM","TYPECOM","NCC","LIBELLE","DEP"]) return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']] def import_statistics_csv(raw_file): """ Process stats files """ logger.info('import town from {}'.format(raw_file)) stats = pd.read_csv(raw_file, usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP", "NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318", "DECE0813","DECE9908","DECE9099","DECE8290","P18_LOG","P13_LOG", "P08_LOG","D99_LOG","D90_LOG","D82_LOG", "P18_LOGVAC","P13_LOGVAC", "P08_LOGVAC","D99_LOGVAC","D90_LOGVAC","D82_LOGVAC","P18_RP", "P13_RP","P08_RP","D99_RP","D90_RP","D82_RP", "P18_RSECOCC", "P13_RSECOCC","P08_RSECOCC","D99_RSECOCC","D90_RSECOCC", "D82_RSECOCC"], sep=';') return stats def get_single_date(attr): logger.debug('get a date from {}'.format(attr)) m = re.match('^[D,P]([0-9]{2}).*$', attr) if m.group(1) is None: logger.error('Cand determine single date from {}'.format(attr)) if m.group(1).startswith(tuple(['0','1','2'])): return 2000 + int(m.group(1)), 'null' else: return 1900 + int(m.group(1)), 'null' def get_range_date(attr): logger.debug('get date range from {}'.format(attr)) m = re.match('^[A-Z]*([0-9]{2})([0-9]{2}).*$', attr) date = {} try: for i in [1,2]: logger.debug('Process two digits: {}'.format(m.group(i))) if m.group(i).startswith(tuple(['0','1','2'])): date[i] = 2000 + int(m.group(i)) else: date[i] = 1900 + int(m.group(i)) except Exception as e: logger.error( 'Error when trying to determine daterange from {} - {}'.format( attr, e ) ) return None, None return date[1], date[2] if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') t = Timer(logger=logger.info) t.start('Import_CSV') if not os.path.exists(args.source + '/' + args.states): logger.critical('can\'t find source file for states') sys.exit(1) states = import_states_csv(args.source + '/' + args.states) t.stop() logger.debug(states) t.start(name='Import_CSV') if not os.path.exists(args.source + '/' + args.departments): logger.critical('can\'t find source file for departments') sys.exit(1) departments = import_department_csv(args.source + '/' + args.departments) t.stop() logger.debug(departments) t.start('Import_CSV') if not os.path.exists(args.source + '/' + args.towns): logger.critical('can\'t find source file for departments') sys.exit(1) towns = import_towns_csv(args.source + '/' + args.towns) t.stop() logger.debug(towns) t.start('Import_CSV') if not os.path.exists(args.source + '/' + args.statistics): logger.critical('can\'t find source file for statistics') sys.exit(1) statistics = import_statistics_csv(args.source + '/' + args.statistics) t.stop() logger.debug(statistics) t.get_time_by_tag('Import_CSV') # Create missing table : indicators indicators = pd.DataFrame({'indicateur': [ 'population', 'naissances', 'deces', 'logements', 'logements vacants', 'residences principales', 'residences secondaires et logements occasionnels' ], 'code': ['_POP','NAIS', 'DECE','_LOG', '_LOGVAC', '_RP', '_RSECOCC']}, index=[1,2,3,4,5,6,7]) logger.debug(indicators) ## Create departments capitals dep_capitals = departments[['CHEFLIEU','DEP']] dep_capitals.columns = ["CHEFLIEUDEP","DEP"] departments = departments[["DEP","NCC","LIBELLE","REG"]] logger.debug(dep_capitals) ## Create states capitals states_capitals = states[['CHEFLIEU','REG']] states_capitals.columns = ["CHEFLIEUREG","REG"] departments = departments[["REG","NCC","LIBELLE"]] logger.debug(states_capitals) ## create statistics dataframes # # We need to first iterate on statistics if args.verbose or args.debug: t.start() c_stats = pd.DataFrame(columns = ['com','id_indicateur','date_debut', 'date_fin','valeur'] ) temp = {"com" : [], "id_indicateur" : [], "date_debut" : [], "date_fin" : [], "valeur" : [] } for s_index,srow in statistics.iterrows(): for index, irow in indicators.iterrows(): if irow['code'].startswith('_'): regex = irow['code'] + '$' else: regex = '^' + irow['code'] logger.debug('Process indicator {}'.format(regex)) selection = srow.filter(regex=regex) for attribute, value in selection.items(): logger.debug('check code: {}'.format(irow['code'])) if irow['code'].startswith('_'): start,end = get_single_date(attribute) else: start,end = get_range_date(attribute) if start is None or end is None: logger.error('Can\'t process line, continue to next') continue logger.debug( 'town:{}, id_indic: {}, start: {}, end: {}, value:{}' .format( srow['CODGEO'], index, start, end, value ) ) temp['com'].append(srow['CODGEO']) temp['id_indicateur'].append(index) temp['date_debut'].append(start) temp['date_fin'].append(end) temp['valeur'].append(value) t.stop() t.get_total_time() sys.exit()