#!/usr/bin/env python import pandas as pd import io import numpy as np import sys import os import re import time import logging import argparse as arg class CustomFormatter(logging.Formatter): grey = "\x1b[0;35m" blue = "\x1b[34;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" format = "%(levelname)s: %(message)s (%(filename)s:%(lineno)d)" FORMATS = { logging.DEBUG: blue + format + reset, logging.INFO: grey + format + reset, logging.WARNING: yellow + format + reset, logging.ERROR: red + format + reset, logging.CRITICAL: bold_red + format + reset } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--source', '-s', help='csv source directory', default='csv') parser.add_argument('--export', '-e', help='processeced csv directory', default='exports') parser.add_argument('--towns', help='town raw csv file (inside source follder)', default='commune2021.csv') parser.add_argument('--departments', help='departments raw csv file (inside source follder)', default='departement2021.csv') parser.add_argument('--states', help='states raw csv file (inside source follder)', default='region2021.csv') parser.add_argument('--statistics', help='statistics raw csv file to import', default='statistiques.csv') debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() def import_states_csv(raw_file): """ Process states raw file """ logger.info('import states from {}'.format(raw_file)) reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}' states = pd.read_csv(raw_file, usecols=["REG","NCC","LIBELLE","CHEFLIEU"], converters={'REG': reg_convert}) return states def import_department_csv(raw_file): """ Process department files """ logger.info('import departments from {}'.format(raw_file)) reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}' dep = pd.read_csv(raw_file, usecols=["DEP","NCC","LIBELLE","REG","CHEFLIEU"], converters={'REG':reg_convert}) return dep def import_towns_csv(raw_file): """ Process department files """ logger.info('import town from {}'.format(raw_file)) towns = pd.read_csv(raw_file, usecols=["COM","TYPECOM","NCC","LIBELLE","DEP"]) return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']] def import_statistics_csv(raw_file): """ Process stats files """ logger.info('import town from {}'.format(raw_file)) stats = pd.read_csv(raw_file, usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP", "NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318", "DECE0813","DECE9908","DECE9099","DECE8290","P18_LOG","P13_LOG", "P08_LOG","D99_LOG","D90_LOG","D82_LOG", "P18_LOGVAC","P13_LOGVAC", "P08_LOGVAC","D99_LOGVAC","D90_LOGVAC","D82_LOGVAC","P18_RP", "P13_RP","P08_RP","D99_RP","D90_RP","D82_RP", "P18_RSECOCC", "P13_RSECOCC","P08_RSECOCC","D99_RSECOCC","D90_RSECOCC", "D82_RSECOCC"], sep=';') return stats def get_single_date(attr): logger.debug('get a date from {}'.format(attr)) m = re.match('^[D,P]([0-9]{2}).*$', attr) if m.group(1) is None: logger.error('Cand determine single date from {}'.format(attr)) if m.group(1).startswith(tuple(['0','1','2'])): return 2000 + int(m.group(1)), 'null' else: return 1900 + int(m.group(1)), 'null' def get_range_date(attr): logger.debug('get date range from {}'.format(attr)) m = re.match('^[A-Z]*([0-9]{2})([0-9]{2}).*$', attr) date = {} try: for i in [1,2]: logger.debug('Process two digits: {}'.format(m.group(i))) if m.group(i).startswith(tuple(['0','1','2'])): date[i] = 2000 + int(m.group(i)) else: date[i] = 1900 + int(m.group(i)) except Exception as e: logger.error( 'Error when trying to determine daterange from {} - {}'.format( attr, e ) ) return None, None return date[1], date[2] if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') if not os.path.exists(args.source + '/' + args.states): logger.critical('can\'t find source file for states') sys.exit(1) states = import_states_csv(args.source + '/' + args.states) logger.debug(states) if not os.path.exists(args.source + '/' + args.departments): logger.critical('can\'t find source file for departments') sys.exit(1) departments = import_department_csv(args.source + '/' + args.departments) logger.debug(departments) if not os.path.exists(args.source + '/' + args.towns): logger.critical('can\'t find source file for departments') sys.exit(1) towns = import_towns_csv(args.source + '/' + args.towns) logger.debug(towns) if not os.path.exists(args.source + '/' + args.statistics): logger.critical('can\'t find source file for statistics') sys.exit(1) statistics = import_statistics_csv(args.source + '/' + args.statistics) logger.debug(statistics) # Create missing table : indicators indicators = pd.DataFrame({'indicateur': [ 'population', 'naissances', 'deces', 'logements', 'logements vacants', 'residences principales', 'residences secondaires et logements occasionnels' ], 'code': ['_POP','NAIS', 'DECE','_LOG', '_LOGVAC', '_RP', '_RSECOCC']}, index=[1,2,3,4,5,6,7]) logger.debug(indicators) ## Create departments capitals dep_capitals = departments[['CHEFLIEU','DEP']] dep_capitals.columns = ["CHEFLIEUDEP","DEP"] departments = departments[["DEP","NCC","LIBELLE","REG"]] logger.debug(dep_capitals) ## Create states capitals states_capitals = states[['CHEFLIEU','REG']] states_capitals.columns = ["CHEFLIEUREG","REG"] departments = departments[["REG","NCC","LIBELLE"]] logger.debug(states_capitals) ## create statistics dataframes # # We need to first iterate on statistics if args.verbose or arg.debug: t_begin = time.time() logger.info('BEGIN - import stats') c_stats = pd.DataFrame(columns = ['com','id_indicateur','date_debut', 'date_fin','valeur'] ) temp = {"com" : [], "id_indicateur" : [], "date_debut" : [], "date_fin" : [], "valeur" : [] } for s_index,srow in statistics.iterrows(): for index, irow in indicators.iterrows(): if irow['code'].startswith('_'): regex = irow['code'] + '$' else: regex = '^' + irow['code'] logger.debug('Process indicator {}'.format(regex)) selection = srow.filter(regex=regex) for attribute, value in selection.items(): logger.debug('check code: {}'.format(irow['code'])) if irow['code'].startswith('_'): start,end = get_single_date(attribute) else: start,end = get_range_date(attribute) if start is None or end is None: logger.error('Can\'t process line, continue to next') continue logger.debug( 'town:{}, id_indic: {}, start: {}, end: {}, value:{}' .format( srow['CODGEO'], index, start, end, value ) ) temp['com'].append(srow['CODGEO']) temp['id_indicateur'].append(index) temp['date_debut'].append(start) temp['date_fin'].append(end) temp['valeur'].append(value) if args.verbose or arg.debug: t_end = time.time() logger.info('END stats import, time: {} seconds'.format(t_end - t_begin)) sys.exit()