diff --git a/createdatabase.py b/createdatabase.py deleted file mode 100755 index 7303474..0000000 --- a/createdatabase.py +++ /dev/null @@ -1,247 +0,0 @@ -#!/usr/bin/env python -import sys -import os -import re -# import time -import logging -import argparse as arg -import psycopg2 as pg -from psycopg2.extensions import AsIs - -from classes.CustomFormater import CustomFormatter -from classes.Timer import Timer - -# Schema name is NAMEDATALEN-1 (PGSQL source code) -# -> src/include/pg_config_manual.h -def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")): - if not pat.match(arg_value): - raise ValueError - return arg_value - - -def parse_args(): - """ - Parse arguments - """ - parser = arg.ArgumentParser('Process csv files from INSEE') - - parser.add_argument('--source', '-s', - help='csv source directory', - default='exports') - - parser.add_argument('--connection-file', '-f', - help='Postgresql connexion file', - default='.pgconn' - ) - - # As whe use AsIs function to not include '' in our sql queries for schema - # name, me mus ensure that it is not contains SQL special characters - # like comments --, final coma etc and avoid SQL injection - parser.add_argument('--schema-name', - help='Database schema name', - type=check_schema_name, - default='insee' - ) - - debug_group = parser.add_mutually_exclusive_group() - debug_group.add_argument('--verbose', '-V', - help='Verbose output', - action='store_true') - debug_group.add_argument('--debug', '-d', - help='Activate debug mode', - action='store_true') - return parser.parse_args() - -if __name__ == '__main__': - args = parse_args() - t = Timer() - #logging.basicConfig(level=logging.DEBUG) - logger = logging.getLogger() - tty_handler = logging.StreamHandler() - - # create console handler with a higher log level - tty_handler.setFormatter(CustomFormatter()) - logger.addHandler(tty_handler) - - if args.verbose is True: - logger.setLevel(logging.INFO) - logger.info('VERBOSE mode activated') - - if args.debug is True: - logger.setLevel(logging.DEBUG) - logger.debug('DEBUG mode activated') - - logging.debug('Import pgsql connection file {}'.format(args.connection_file)) - with open(args.connection_file) as cf: - pg_conn = cf.read() - - t.start('Create database schema') - conn = pg.connect(pg_conn) - with conn.cursor() as curs: - curs.execute('CREATE SCHEMA IF NOT EXISTS %s', (AsIs(args.schema_name),)) - curs.execute(""" - CREATE TABLE IF NOT EXISTS %(schema)s.region ( - id_region serial PRIMARY KEY, - reg VARCHAR(2) unique, - ncc VARCHAR(200), - libelle VARCHAR(200) - );""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE TABLE IF NOT EXISTS %(schema)s.departement ( - id_departement serial PRIMARY KEY, - dep VARCHAR(3) unique, - ncc VARCHAR(200), - libelle VARCHAR(200), - reg VARCHAR(2) REFERENCES %(schema)s.region(reg) - );""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE TABLE IF NOT EXISTS %(schema)s.commune ( - id_commune serial PRIMARY KEY, - com VARCHAR(5) unique, - ncc VARCHAR(200), - libelle VARCHAR(200), - dep VARCHAR(3) REFERENCES %(schema)s.departement(dep), - superf FLOAT CONSTRAINT SUPERFICIE_POSITIVE CHECK (superf > 0) - );""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE TABLE IF NOT EXISTS %(schema)s.cheflieudepartement( - dep VARCHAR(3) REFERENCES %(schema)s.departement(dep), - cheflieudep VARCHAR(5) REFERENCES %(schema)s.commune(com), - PRIMARY KEY (dep, cheflieudep) - );""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE TABLE IF NOT EXISTS %(schema)s.cheflieuregion( - reg VARCHAR(3) REFERENCES %(schema)s.region(reg), - cheflieureg VARCHAR(5) REFERENCES %(schema)s.commune(com), - PRIMARY KEY (reg, cheflieureg) - );""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE TABLE IF NOT EXISTS %(schema)s.indicateur( - id_indicateur INTEGER PRIMARY KEY, - libelle VARCHAR(200) - );""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE TABLE IF NOT EXISTS %(schema)s.statistique( - com VARCHAR(5) REFERENCES %(schema)s.commune(com), - id_indicateur INTEGER REFERENCES %(schema)s.indicateur(id_indicateur), - date_debut SMALLINT, - date_fin SMALLINT CONSTRAINT APRES_DATEDEBUT CHECK (date_fin > date_debut), - valeur float, - PRIMARY KEY (com,id_indicateur,date_debut) - );""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE OR REPLACE VIEW %(schema)s.view_indicateur_dep AS - SELECT id_departement, d.dep, d.ncc, s.id_indicateur, s.date_debut, SUM(s.VALEUR) - FROM %(schema)s.departement d - INNER JOIN %(schema)s.commune c ON c.dep = d.dep - INNER JOIN %(schema)s.statistique s ON s.com = c.com - GROUP BY id_departement,d.dep, s.date_debut, d.ncc, s.id_indicateur - ORDER BY id_indicateur;""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE OR REPLACE VIEW %(schema)s.view_indicateur_reg AS - SELECT id_region, r.reg, r.ncc, s.id_indicateur, s.date_debut, SUM(s.valeur) - FROM %(schema)s.region r - INNER JOIN %(schema)s.departement d ON d.reg = r.reg - INNER JOIN %(schema)s.commune c ON c.dep = d.dep - INNER JOIN %(schema)s.statistique s ON s.com = c.com - GROUP BY id_region, r.reg, s.date_debut, r.ncc, s.id_indicateur - ORDER BY id_indicateur;""", - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - ALTER TABLE %(schema)s.region - ADD COLUMN IF NOT EXISTS population INT; - """, - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - ALTER TABLE %(schema)s.departement - ADD COLUMN IF NOT EXISTS population INT; - """, - {'schema':AsIs(args.schema_name)}) - - curs.execute(""" - CREATE OR REPLACE PROCEDURE %(schema)s.PRC_POP_REG_DEP() - LANGUAGE plpgsql - AS $$ - DECLARE - REC RECORD; - BEGIN - - FOR REC IN (SELECT id_indicateur, ncc, SUM valeur - FROM %(schema)s.view_indicateur_dep v - WHERE id_indicateur = 1 - AND date_debut = '2018') LOOP - - UPDATE %(schema)s.departement - SET population = REC.valeur - WHERE id_departement = REC.id_departement; - - END LOOP; - - FOR REC IN (SELECT reg, SUM(population) valeur - FROM DEPARTEMENT d - GROUP BY REG) LOOP - - UPDATE %(schema)s.region - SET population = REC.VALEUR - WHERE reg = REC.reg; - - END LOOP; - END; - $$;""", - {'schema':AsIs(args.schema_name)}) - conn.commit() - t.stop() - - t.start('Import data from csv files') - with conn.cursor() as curs: - curs.execute('SET search_path TO {}'.format(args.schema_name)) - csvs = { - 'region': {'col': ('reg', 'ncc', 'libelle'), 'null':''}, - 'departement': {'col': ('dep', 'ncc', 'libelle', 'reg'), 'null': ''}, - 'commune': {'col': ('com', 'ncc', 'libelle', 'dep', 'superf'), 'null':''}, - 'cheflieudepartement': {'col': ('cheflieudep', 'dep'), 'null':''}, - 'cheflieuregion': {'col': ('cheflieureg', 'reg'), 'null':''}, - 'indicateur': {'col': ('id_indicateur', 'libelle'), 'null':''}, - 'statistique': { - 'col': ( - 'com', - 'id_indicateur', - 'date_debut', - 'date_fin', - 'valeur' - ), - 'null':'null' - } - } - for csv, params in csvs.items(): - logging.debug('Process csv file : {} with {}'.format(csv, params)) - with open( args.source + '/' + csv + '.csv') as f: - with conn.cursor() as curs: - curs.copy_from( - f, - csv, - sep=',', - columns=params['col'], - null=params['null'] - ) - conn.commit() - t.stop() - conn.close() diff --git a/csvprocess.py b/csvprocess.py index c9c7040..d35ea7c 100755 --- a/csvprocess.py +++ b/csvprocess.py @@ -82,12 +82,6 @@ def import_towns_csv(raw_file): return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']] -def stats_convert(codegeo): - if len(codegeo) == 4: - return 0 + codegeo - if len(codegeo) == 6: - return codegeo[1:] - return codegeo def import_statistics_csv(raw_file): """ @@ -95,6 +89,7 @@ def import_statistics_csv(raw_file): """ logger.info('import town from {}'.format(raw_file)) + stats_convert= lambda x: x if len(str(x)) == 5 else f'0{x}' stats = pd.read_csv(raw_file, usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP", "NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318", @@ -142,10 +137,10 @@ def get_range_date(attr): return date[1], date[2] -def export_csv(dataframe, path, index = False): +def export_csv(dataframe, path): logger.debug('export csv from panda dataframe') try: - dataframe.to_csv(path ,header = False, index = index) + dataframe.to_csv(path ,header = False, index= False) except Exception as e: logger.error( 'Erro when exporting Dataframe to csvfile {}. \n{}'.format( @@ -206,7 +201,6 @@ if __name__ == '__main__': logger.critical('can\'t find source file for statistics') sys.exit(1) statistics = import_statistics_csv(args.source + '/' + args.statistics) - statistics = statistics[statistics['CODGEO'].isin(towns['COM'])] t.stop() logger.debug(statistics) @@ -234,7 +228,7 @@ if __name__ == '__main__': ## Create states capitals states_capitals = states[['CHEFLIEU','REG']] states_capitals.columns = ["CHEFLIEUREG","REG"] - states = states[[ "REG","NCC","LIBELLE"]] + departments = departments[["REG","NCC","LIBELLE"]] logger.debug(states_capitals) ## create statistics dataframes @@ -259,14 +253,8 @@ if __name__ == '__main__': logger.debug('Process indicator {}'.format(regex)) selection = srow.filter(regex=regex) - for attribute, value in selection.items(): logger.debug('check code: {}'.format(irow['code'])) - - # If value is NaN, then do not add the line - if pd.isna(value): - continue - if irow['code'].startswith('_'): start,end = get_single_date(attribute) else: @@ -275,7 +263,7 @@ if __name__ == '__main__': if start is None or end is None: logger.error('Can\'t process line, continue to next') continue - + logger.debug( 'town:{}, id_indic: {}, start: {}, end: {}, value:{}' .format( @@ -295,7 +283,7 @@ if __name__ == '__main__': consolidated_stats = pd.DataFrame.from_dict(temp) t.stop() - indicators=indicators[['indicateur']] + t.start('Process_town') towns = pd.merge(towns, statistics[['CODGEO', 'SUPERF']], @@ -313,8 +301,8 @@ if __name__ == '__main__': export_csv(states, args.export + '/region.csv') export_csv(dep_capitals, args.export + '/cheflieudepartement.csv') export_csv(states_capitals, args.export + '/cheflieuregion.csv') - export_csv(indicators, args.export + '/indicateur.csv', True) - export_csv(consolidated_stats, args.export + '/statistique.csv') + export_csv(indicators, args.export + '/indicateur.csv') + export_csv(consolidated_stats, args.export + '/statistiques.csv') t.stop() t.get_times_by_tag() diff --git a/requirement.txt b/requiremment.txt similarity index 100% rename from requirement.txt rename to requiremment.txt