Last active
September 13, 2017 16:44
-
-
Save ewhitebloom/7c90b74d76aae5434466 to your computer and use it in GitHub Desktop.
ETL NPI data from NPI CSV file to MySQL.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"database/sql" | |
"fmt" | |
_ "github.com/go-sql-driver/mysql" | |
"os" | |
"reflect" | |
"strconv" | |
"strings" | |
"sync" | |
"time" | |
) | |
const rowCount float64 = 4636902 | |
var ( | |
taxonomyCodes = map[string]string{ | |
"282E00000X": "LongTermCareHospital", | |
"283X00000X": "AcuteRehabFacility", | |
"251G00000X": "Hospice", | |
"314000000X": "SkilledNursingFacility", | |
"251E00000X": "HomeHealthAgency", | |
} | |
medicareColumnIndices = GenerateMedicareColumnIndices() | |
NPIFilePath = os.Getenv("NPI_FILE") | |
MYSQLAUTH = os.Getenv("GO_MYSQLAUTH") | |
wg sync.WaitGroup | |
rowCounter float64 = 0 | |
) | |
type Provider struct { | |
Npi *int64 | |
ProviderType *string | |
Name *string | |
Street *string | |
Street2 *string | |
City *string | |
State *string | |
Zip *string | |
Phone *string | |
MedicareProviderNumber *string | |
} | |
type CareProviderRowCache struct { | |
databaseId *int64 | |
databaseNPI *int64 | |
databaseType *string | |
} | |
// For new facilities, not already in the database, insert new rows into the care_providers table (one for each taxonomy of the provider), | |
// including their address and contact information. | |
func (p *Provider) StoreProviderCSVRowToDB(db *sql.DB, rowMedicareNumbers []string, rowTaxonomyProviderTypes []string) bool { | |
for _, rowTaxonomyN := range rowTaxonomyProviderTypes { | |
transaction, err := db.Begin() | |
if err != nil { | |
fmt.Println(err.Error()) | |
return false | |
} | |
if transaction != nil { | |
if len(rowMedicareNumbers) > 0 { | |
p.MedicareProviderNumber = &rowMedicareNumbers[0] | |
} | |
p.ProviderType = &rowTaxonomyN | |
any, err := p.AddressAndTypeDuplicates(transaction) | |
if err != nil { | |
fmt.Println(err.Error()) | |
return false | |
} | |
if any { | |
transaction.Rollback() | |
continue | |
} | |
result, err := transaction.Exec("INSERT INTO care_providers (npi, type, name, phone_number, medicare_provider_number, onboarding_stage, updated_at, created_at) VALUES (?,?,?,?,?,'red',NOW(),NOW())", *p.Npi, *p.ProviderType, *p.Name, NilorValueString(p.Phone), NilorValueString(p.MedicareProviderNumber)) | |
if err != nil { | |
transaction.Rollback() | |
fmt.Println(err.Error()) | |
return false | |
} else if result != nil { | |
providerId, err := result.LastInsertId() | |
if err != nil || providerId == 0 { | |
transaction.Rollback() | |
fmt.Println(err.Error()) | |
return false | |
} | |
success := p.StoreProviderAddress(providerId, transaction) | |
if !success { | |
return false | |
} | |
err2 := transaction.Commit() | |
if err2 != nil { | |
fmt.Println(err2.Error()) | |
return false | |
} | |
} else { | |
transaction.Rollback() | |
return false | |
} | |
} | |
} | |
return true | |
} | |
//Extra duplication checking based on if there's an existing provider with same address and provider type. | |
func (p *Provider) AddressAndTypeDuplicates(transaction *sql.Tx) (bool, error) { | |
var rows *sql.Rows | |
var err error | |
if p.Street2 != nil { | |
rows, err = transaction.Query("SELECT addressed_id FROM addresses WHERE street=? AND street2=? AND city=? AND state=? AND zip_code=?", *p.Street, *p.Street2, *p.City, *p.State, *p.Zip) | |
} else { | |
rows, err = transaction.Query("SELECT addressed_id FROM addresses WHERE street=? AND street2 IS NULL AND city=? AND state=? AND zip_code=?", *p.Street, *p.City, *p.State, *p.Zip) | |
} | |
if err != nil { | |
transaction.Rollback() | |
fmt.Println(err.Error()) | |
return false, err | |
} | |
careProviderIds := []sql.NullInt64{} | |
for rows.Next() { | |
var addressed_id sql.NullInt64 | |
rows.Scan(&addressed_id) | |
careProviderIds = append(careProviderIds, addressed_id) | |
} | |
for _, careProviderId := range careProviderIds { | |
rows2, err2 := transaction.Query("SELECT * FROM care_providers WHERE id=? AND type=?", careProviderId, *p.ProviderType) | |
if err2 != nil { | |
rows2.Close() | |
transaction.Rollback() | |
fmt.Println(err2.Error()) | |
return false, err2 | |
} | |
if rows2.Next() { | |
rows2.Close() | |
transaction.Rollback() | |
return true, err2 | |
} | |
} | |
return false, err | |
} | |
// For Skilled Nursing and HomeHealth Agencies already in the database, | |
// if found by their medicare provider number and without an NPI number | |
// set their NPI number from the CSV row using algorithm | |
// if one doesn't already have that NPI number. | |
// If no providers are found by the medicare numbers, make a new provider. | |
func (p *Provider) UpdateOrCreateProvider(db *sql.DB, csvRow *[]string, rowMedicareNumbers []string, rowTaxonomyProviderTypes []string) bool { | |
transaction, err := db.Begin() | |
if transaction != nil { | |
if err != nil { | |
transaction.Rollback() | |
return false | |
} | |
rowNPI, _ := strconv.Atoi((*csvRow)[0]) | |
rowsCache := []CareProviderRowCache{} | |
// Find existing provider matches by medicare numbers or row npi. | |
databaseRows, err := transaction.Query("SELECT id, npi, type FROM care_providers WHERE medicare_provider_number IN (?) OR npi=?", strings.Join(rowMedicareNumbers, ","), rowNPI) | |
if err != nil { | |
transaction.Rollback() | |
return false | |
} | |
defer databaseRows.Close() | |
//Look for existing providers that already match CSV row NPI number. | |
for databaseRows.Next() { | |
var databaseId *int64 | |
var databaseNPI *int64 | |
var databaseType *string | |
err := databaseRows.Scan(&databaseId, &databaseNPI, &databaseType) | |
if err != nil || databaseId == nil { | |
transaction.Rollback() | |
return false | |
} | |
if int64(rowNPI) == *databaseNPI { | |
databaseRows.Close() | |
transaction.Rollback() | |
return true | |
} | |
rowsCache = append(rowsCache, CareProviderRowCache{databaseId, databaseNPI, databaseType}) | |
} | |
// If no providers match CSV row NPI number, | |
// assign that NPI number to a provider that does match row's provider types and medicare numbers. | |
for _, rowTaxonomyType := range rowTaxonomyProviderTypes { | |
for _, rowCached := range rowsCache { | |
if rowCached.databaseNPI == nil && rowTaxonomyType == *rowCached.databaseType { | |
_, err := transaction.Exec("UPDATE care_providers SET npi=?, updated_at=NOW() WHERE id=?", rowNPI, int(*rowCached.databaseId)) | |
if err != nil { | |
transaction.Rollback() | |
return false | |
} | |
transaction.Commit() | |
return true | |
} | |
} | |
} | |
transaction.Rollback() | |
//Otherwise, if no other criteria worked, make a new provider. | |
for { | |
if p.StoreProviderCSVRowToDB(db, rowMedicareNumbers, rowTaxonomyProviderTypes) { | |
return true | |
} | |
} | |
} else { | |
return false | |
} | |
} | |
func (p *Provider) StoreProviderAddress(providerId int64, transaction *sql.Tx) bool { | |
_, err := transaction.Exec("INSERT INTO addresses (addressed_id, addressed_type, street, street2, city, state, zip_code, created_at, updated_at) VALUES (?,'CareProvider',?,?,?,?,?,NOW(),NOW())", providerId, *p.Street, NilorValueString(p.Street2), *p.City, *p.State, *p.Zip) | |
if err != nil { | |
fmt.Println(err.Error()) | |
transaction.Rollback() | |
return false | |
} | |
return true | |
} | |
func (p *Provider) QualityDataProvider() bool { | |
if p.Name == nil || p.Npi == nil || p.Zip == nil { | |
return false | |
} | |
return true | |
} | |
func (p *Provider) IsSkilledNursingOrHomeHealth(rowTaxonomyProviderTypes []string) bool { | |
for _, SnfHhaCode := range [2]string{"SkilledNursingFacility", "HomeHealthAgency"} { | |
for _, taxCode := range rowTaxonomyProviderTypes { | |
if taxCode == SnfHhaCode { | |
return true | |
} | |
} | |
} | |
return false | |
} | |
func NilorValueString(fieldPointer *string) interface{} { | |
if fieldPointer == nil { | |
return nil | |
} else { | |
return *fieldPointer | |
} | |
} | |
// Filters CSV rows for the right taxonomy codes (facility type) that we need, | |
// placing the primary taxonomy code first, if specified. | |
// Maps code to actual provider type name. | |
func FindAndMapTaxonomyCodes(row *[]string) ([]string, bool) { | |
codes := []string{} | |
for _, v := range [15]int{47, 51, 55, 59, 63, 67, 71, 75, 79, 83, 87, 91, 95, 99, 103} { | |
taxCode, present := taxonomyCodes[(*row)[v]] | |
if present { | |
if (*row)[v+3] == "Y" { | |
codes = append([]string{taxCode}, codes...) | |
} else { | |
codes = append(codes, taxCode) | |
} | |
} | |
} | |
return codes, len(codes) > 0 | |
} | |
// Build a provider struct, using runtime reflection, provided by Go's standard 'reflect' package. | |
func BuildProvider(row *[]string) Provider { | |
provider := Provider{} | |
providerValue := reflect.ValueOf(&provider).Elem() | |
providerFields := map[string]int{ | |
"Npi": 0, | |
"Name": 4, | |
"Street": 20, | |
"Street2": 21, | |
"City": 22, | |
"State": 23, | |
"Zip": 24, | |
"Phone": 26, | |
} | |
for fieldName, fieldRowIndex := range providerFields { | |
field := providerValue.FieldByName(fieldName) | |
if fieldName != "Npi" { | |
if len((*row)[fieldRowIndex]) > 1 { | |
if fieldName == "Phone" { | |
phoneNumber := (*row)[26][:3] + "-" + (*row)[26][3:6] + "-" + (*row)[26][6:] | |
field.Set(reflect.ValueOf(&phoneNumber)) | |
} else { | |
fieldValue := (*row)[fieldRowIndex] | |
field.Set(reflect.ValueOf(&fieldValue)) | |
} | |
} | |
} else { | |
npi, _ := strconv.ParseInt((*row)[fieldRowIndex], 10, 0) | |
if npi != 0 { | |
field.Set(reflect.ValueOf(&npi)) | |
} | |
} | |
} | |
return provider | |
} | |
// With the CSV indices generated from GenerateMedicareColumnIndices(), | |
// this creates a slice of all potential medicare numbers (length six elements in the CSV row). | |
func FindMedicareNumbers(row *[]string) []string { | |
var medicareNumbers []string | |
for _, v := range medicareColumnIndices { | |
if len((*row)[v]) == 6 { | |
medicareNumbers = append(medicareNumbers, (*row)[v]) | |
} | |
} | |
return medicareNumbers | |
} | |
// Method to generate the column indices where medicare provider numbers could potentially be in the CSV row. | |
func GenerateMedicareColumnIndices() [50]int { | |
columnPointer := 107 | |
indexContainer := [50]int{} | |
for i := 0; i < 50; i++ { | |
indexContainer[i] = columnPointer | |
columnPointer += 4 | |
} | |
return indexContainer | |
} | |
//Builds a CSV row fields slice, stripping out unneccessary quotes. | |
func BuildCSVFields(csvRow string) []string { | |
tokenizedLine := strings.Split(csvRow, "\",\"") | |
for _, v := range [2]int{0, len(tokenizedLine) - 1} { | |
tokenizedLine[v] = strings.Replace(tokenizedLine[v], "\"", "", -1) | |
} | |
return tokenizedLine | |
} | |
func ActiveFacility(csvRow []string) bool { | |
deactivation := strings.Trim(csvRow[39], " ") | |
reactivation := strings.Trim(csvRow[40], " ") | |
if deactivation == "" || (deactivation != "" && reactivation != "") { | |
return true | |
} | |
return false | |
} | |
// Container method for all filtering of relevant data, and database queries. | |
func StoreCSVRowToDB(row *[]string, db *sql.DB, rowTaxonomyProviderTypes []string, rowMedicareNumbers []string) { | |
defer wg.Done() | |
provider := BuildProvider(row) | |
if !provider.QualityDataProvider() { | |
return | |
} | |
if len(rowMedicareNumbers) > 0 && provider.IsSkilledNursingOrHomeHealth(rowTaxonomyProviderTypes) { | |
for { | |
if provider.UpdateOrCreateProvider(db, row, rowMedicareNumbers, rowTaxonomyProviderTypes) { | |
break | |
} | |
} | |
} else { | |
for { | |
if provider.StoreProviderCSVRowToDB(db, rowMedicareNumbers, rowTaxonomyProviderTypes) { | |
break | |
} | |
} | |
} | |
} | |
func ShowProgress() { | |
rowCounter++ | |
if int(rowCounter)%200000 == 0 { | |
fmt.Printf("%.2f%%\n", (rowCounter/rowCount)*float64(100)) | |
} | |
} | |
func main() { | |
db, err := sql.Open("mysql", MYSQLAUTH) | |
if err != nil { | |
panic(err.Error()) | |
} | |
defer db.Close() | |
db.SetMaxOpenConns(30) | |
err2 := db.Ping() | |
if err2 != nil { | |
panic(err2.Error()) | |
} | |
file, err := os.Open(NPIFilePath) | |
defer file.Close() | |
scanner := bufio.NewScanner(file) | |
//Skip CSV header line. | |
success := scanner.Scan() | |
if !success { | |
return | |
} | |
rowCounter++ | |
start := time.Now() | |
for { | |
if scanner.Scan() { | |
tokenizedLine := BuildCSVFields(scanner.Text()) | |
rowTaxonomyProviderTypes, any := FindAndMapTaxonomyCodes(&tokenizedLine) | |
if any && ActiveFacility(tokenizedLine) { | |
wg.Add(1) | |
go StoreCSVRowToDB(&tokenizedLine, db, rowTaxonomyProviderTypes, FindMedicareNumbers(&tokenizedLine)) | |
} | |
ShowProgress() | |
} else { | |
break | |
} | |
} | |
wg.Wait() | |
fmt.Println(time.Since(start)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment