diff --git a/createdatabase.py b/createdatabase.py new file mode 100755 index 0000000..7303474 --- /dev/null +++ b/createdatabase.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python +import sys +import os +import re +# import time +import logging +import argparse as arg +import psycopg2 as pg +from psycopg2.extensions import AsIs + +from classes.CustomFormater import CustomFormatter +from classes.Timer import Timer + +# Schema name is NAMEDATALEN-1 (PGSQL source code) +# -> src/include/pg_config_manual.h +def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")): + if not pat.match(arg_value): + raise ValueError + return arg_value + + +def parse_args(): + """ + Parse arguments + """ + parser = arg.ArgumentParser('Process csv files from INSEE') + + parser.add_argument('--source', '-s', + help='csv source directory', + default='exports') + + parser.add_argument('--connection-file', '-f', + help='Postgresql connexion file', + default='.pgconn' + ) + + # As whe use AsIs function to not include '' in our sql queries for schema + # name, me mus ensure that it is not contains SQL special characters + # like comments --, final coma etc and avoid SQL injection + parser.add_argument('--schema-name', + help='Database schema name', + type=check_schema_name, + default='insee' + ) + + debug_group = parser.add_mutually_exclusive_group() + debug_group.add_argument('--verbose', '-V', + help='Verbose output', + action='store_true') + debug_group.add_argument('--debug', '-d', + help='Activate debug mode', + action='store_true') + return parser.parse_args() + +if __name__ == '__main__': + args = parse_args() + t = Timer() + #logging.basicConfig(level=logging.DEBUG) + logger = logging.getLogger() + tty_handler = logging.StreamHandler() + + # create console handler with a higher log level + tty_handler.setFormatter(CustomFormatter()) + logger.addHandler(tty_handler) + + if args.verbose is True: + logger.setLevel(logging.INFO) + logger.info('VERBOSE mode activated') + + if args.debug is True: + logger.setLevel(logging.DEBUG) + logger.debug('DEBUG mode activated') + + logging.debug('Import pgsql connection file {}'.format(args.connection_file)) + with open(args.connection_file) as cf: + pg_conn = cf.read() + + t.start('Create database schema') + conn = pg.connect(pg_conn) + with conn.cursor() as curs: + curs.execute('CREATE SCHEMA IF NOT EXISTS %s', (AsIs(args.schema_name),)) + curs.execute(""" + CREATE TABLE IF NOT EXISTS %(schema)s.region ( + id_region serial PRIMARY KEY, + reg VARCHAR(2) unique, + ncc VARCHAR(200), + libelle VARCHAR(200) + );""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE TABLE IF NOT EXISTS %(schema)s.departement ( + id_departement serial PRIMARY KEY, + dep VARCHAR(3) unique, + ncc VARCHAR(200), + libelle VARCHAR(200), + reg VARCHAR(2) REFERENCES %(schema)s.region(reg) + );""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE TABLE IF NOT EXISTS %(schema)s.commune ( + id_commune serial PRIMARY KEY, + com VARCHAR(5) unique, + ncc VARCHAR(200), + libelle VARCHAR(200), + dep VARCHAR(3) REFERENCES %(schema)s.departement(dep), + superf FLOAT CONSTRAINT SUPERFICIE_POSITIVE CHECK (superf > 0) + );""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE TABLE IF NOT EXISTS %(schema)s.cheflieudepartement( + dep VARCHAR(3) REFERENCES %(schema)s.departement(dep), + cheflieudep VARCHAR(5) REFERENCES %(schema)s.commune(com), + PRIMARY KEY (dep, cheflieudep) + );""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE TABLE IF NOT EXISTS %(schema)s.cheflieuregion( + reg VARCHAR(3) REFERENCES %(schema)s.region(reg), + cheflieureg VARCHAR(5) REFERENCES %(schema)s.commune(com), + PRIMARY KEY (reg, cheflieureg) + );""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE TABLE IF NOT EXISTS %(schema)s.indicateur( + id_indicateur INTEGER PRIMARY KEY, + libelle VARCHAR(200) + );""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE TABLE IF NOT EXISTS %(schema)s.statistique( + com VARCHAR(5) REFERENCES %(schema)s.commune(com), + id_indicateur INTEGER REFERENCES %(schema)s.indicateur(id_indicateur), + date_debut SMALLINT, + date_fin SMALLINT CONSTRAINT APRES_DATEDEBUT CHECK (date_fin > date_debut), + valeur float, + PRIMARY KEY (com,id_indicateur,date_debut) + );""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE OR REPLACE VIEW %(schema)s.view_indicateur_dep AS + SELECT id_departement, d.dep, d.ncc, s.id_indicateur, s.date_debut, SUM(s.VALEUR) + FROM %(schema)s.departement d + INNER JOIN %(schema)s.commune c ON c.dep = d.dep + INNER JOIN %(schema)s.statistique s ON s.com = c.com + GROUP BY id_departement,d.dep, s.date_debut, d.ncc, s.id_indicateur + ORDER BY id_indicateur;""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE OR REPLACE VIEW %(schema)s.view_indicateur_reg AS + SELECT id_region, r.reg, r.ncc, s.id_indicateur, s.date_debut, SUM(s.valeur) + FROM %(schema)s.region r + INNER JOIN %(schema)s.departement d ON d.reg = r.reg + INNER JOIN %(schema)s.commune c ON c.dep = d.dep + INNER JOIN %(schema)s.statistique s ON s.com = c.com + GROUP BY id_region, r.reg, s.date_debut, r.ncc, s.id_indicateur + ORDER BY id_indicateur;""", + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + ALTER TABLE %(schema)s.region + ADD COLUMN IF NOT EXISTS population INT; + """, + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + ALTER TABLE %(schema)s.departement + ADD COLUMN IF NOT EXISTS population INT; + """, + {'schema':AsIs(args.schema_name)}) + + curs.execute(""" + CREATE OR REPLACE PROCEDURE %(schema)s.PRC_POP_REG_DEP() + LANGUAGE plpgsql + AS $$ + DECLARE + REC RECORD; + BEGIN + + FOR REC IN (SELECT id_indicateur, ncc, SUM valeur + FROM %(schema)s.view_indicateur_dep v + WHERE id_indicateur = 1 + AND date_debut = '2018') LOOP + + UPDATE %(schema)s.departement + SET population = REC.valeur + WHERE id_departement = REC.id_departement; + + END LOOP; + + FOR REC IN (SELECT reg, SUM(population) valeur + FROM DEPARTEMENT d + GROUP BY REG) LOOP + + UPDATE %(schema)s.region + SET population = REC.VALEUR + WHERE reg = REC.reg; + + END LOOP; + END; + $$;""", + {'schema':AsIs(args.schema_name)}) + conn.commit() + t.stop() + + t.start('Import data from csv files') + with conn.cursor() as curs: + curs.execute('SET search_path TO {}'.format(args.schema_name)) + csvs = { + 'region': {'col': ('reg', 'ncc', 'libelle'), 'null':''}, + 'departement': {'col': ('dep', 'ncc', 'libelle', 'reg'), 'null': ''}, + 'commune': {'col': ('com', 'ncc', 'libelle', 'dep', 'superf'), 'null':''}, + 'cheflieudepartement': {'col': ('cheflieudep', 'dep'), 'null':''}, + 'cheflieuregion': {'col': ('cheflieureg', 'reg'), 'null':''}, + 'indicateur': {'col': ('id_indicateur', 'libelle'), 'null':''}, + 'statistique': { + 'col': ( + 'com', + 'id_indicateur', + 'date_debut', + 'date_fin', + 'valeur' + ), + 'null':'null' + } + } + for csv, params in csvs.items(): + logging.debug('Process csv file : {} with {}'.format(csv, params)) + with open( args.source + '/' + csv + '.csv') as f: + with conn.cursor() as curs: + curs.copy_from( + f, + csv, + sep=',', + columns=params['col'], + null=params['null'] + ) + conn.commit() + t.stop() + conn.close() diff --git a/csvprocess.py b/csvprocess.py index d35ea7c..c9c7040 100755 --- a/csvprocess.py +++ b/csvprocess.py @@ -82,6 +82,12 @@ def import_towns_csv(raw_file): return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']] +def stats_convert(codegeo): + if len(codegeo) == 4: + return 0 + codegeo + if len(codegeo) == 6: + return codegeo[1:] + return codegeo def import_statistics_csv(raw_file): """ @@ -89,7 +95,6 @@ def import_statistics_csv(raw_file): """ logger.info('import town from {}'.format(raw_file)) - stats_convert= lambda x: x if len(str(x)) == 5 else f'0{x}' stats = pd.read_csv(raw_file, usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP", "NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318", @@ -137,10 +142,10 @@ def get_range_date(attr): return date[1], date[2] -def export_csv(dataframe, path): +def export_csv(dataframe, path, index = False): logger.debug('export csv from panda dataframe') try: - dataframe.to_csv(path ,header = False, index= False) + dataframe.to_csv(path ,header = False, index = index) except Exception as e: logger.error( 'Erro when exporting Dataframe to csvfile {}. \n{}'.format( @@ -201,6 +206,7 @@ if __name__ == '__main__': logger.critical('can\'t find source file for statistics') sys.exit(1) statistics = import_statistics_csv(args.source + '/' + args.statistics) + statistics = statistics[statistics['CODGEO'].isin(towns['COM'])] t.stop() logger.debug(statistics) @@ -228,7 +234,7 @@ if __name__ == '__main__': ## Create states capitals states_capitals = states[['CHEFLIEU','REG']] states_capitals.columns = ["CHEFLIEUREG","REG"] - departments = departments[["REG","NCC","LIBELLE"]] + states = states[[ "REG","NCC","LIBELLE"]] logger.debug(states_capitals) ## create statistics dataframes @@ -253,8 +259,14 @@ if __name__ == '__main__': logger.debug('Process indicator {}'.format(regex)) selection = srow.filter(regex=regex) + for attribute, value in selection.items(): logger.debug('check code: {}'.format(irow['code'])) + + # If value is NaN, then do not add the line + if pd.isna(value): + continue + if irow['code'].startswith('_'): start,end = get_single_date(attribute) else: @@ -263,7 +275,7 @@ if __name__ == '__main__': if start is None or end is None: logger.error('Can\'t process line, continue to next') continue - + logger.debug( 'town:{}, id_indic: {}, start: {}, end: {}, value:{}' .format( @@ -283,7 +295,7 @@ if __name__ == '__main__': consolidated_stats = pd.DataFrame.from_dict(temp) t.stop() - + indicators=indicators[['indicateur']] t.start('Process_town') towns = pd.merge(towns, statistics[['CODGEO', 'SUPERF']], @@ -301,8 +313,8 @@ if __name__ == '__main__': export_csv(states, args.export + '/region.csv') export_csv(dep_capitals, args.export + '/cheflieudepartement.csv') export_csv(states_capitals, args.export + '/cheflieuregion.csv') - export_csv(indicators, args.export + '/indicateur.csv') - export_csv(consolidated_stats, args.export + '/statistiques.csv') + export_csv(indicators, args.export + '/indicateur.csv', True) + export_csv(consolidated_stats, args.export + '/statistique.csv') t.stop() t.get_times_by_tag() diff --git a/requiremment.txt b/requirement.txt similarity index 100% rename from requiremment.txt rename to requirement.txt