Created
July 14, 2020 11:58
-
-
Save meetme2meat/1da0904eb3b3c46ff9a5c6df6ff5ec83 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"fmt" | |
"log" | |
"os" | |
"sync" | |
"time" | |
"github.com/jinzhu/gorm" | |
_ "github.com/jinzhu/gorm/dialects/postgres" | |
) | |
type FileStreamer struct { | |
FileStreamID uint64 `gorm:"column:file_stream_id;AUTO_INCREMENT"` | |
RemoteProtocol sql.NullString `gorm:"column:remote_protocol"` | |
RemoteCredentials sql.NullString `gorm:"column:remote_credentials"` | |
RemoteServer sql.NullString `gorm:"column:remote_server"` | |
RemotePath sql.NullString `gorm:"column:remote_path"` | |
FileRegex sql.NullString `gorm:"column:file_regex"` | |
LocalPath sql.NullString `gorm:"column:local_path"` | |
Downloadlimit uint64 `gorm:"column:download_limit"` | |
IsCompressed bool `gorm:"column:is_compressed"` | |
UncompressedRegex sql.NullString `gorm:"column:uncompressed_regex"` | |
CompressionType sql.NullString `gorm:"column:compression_type"` | |
Type sql.NullString `gorm:"column:type"` | |
Prefix sql.NullString `gorm:"column:prefix"` | |
IsLoaded bool `gorm:"column:is_loaded"` | |
IsDownloaded bool `gorm:"column:is_downloaded"` | |
CreatedDate sql.NullString `gorm:"column:created_date"` | |
} | |
type FileTracker struct { | |
FileTrackerId uint64 `gorm:"column:file_tracker_id;AUTO_INCREMENT;PRIMARY_KEY"` | |
FileStreamId uint64 `gorm:"column:file_stream_id"` | |
DownloadStartDate string `gorm:"column:download_start_date"` | |
DownloadEndDate string `gorm:"column:download_end_date"` | |
FileName string `gorm:"column:file_name"` | |
UploadedFileName string `gorm:"column:uploaded_file_name"` | |
ErrorsCount uint64 `gorm:"column:errors_count"` | |
} | |
var DB *gorm.DB | |
func connect() { | |
var err error | |
DB, err = gorm.Open("postgres", "host=localhost port=5432 user=admin dbname=cdr_archiver sslmode=disable") | |
if err != nil { | |
log.Panic("[DB] Connect err: ", err) | |
} | |
DB.LogMode(true) | |
} | |
func assertionRun() { | |
fileStream := FileStreamer{} | |
dir, _ := os.Getwd() | |
remotePath := fmt.Sprintf("%s/albmeta_remote_cdrs/", dir) | |
localPath := fmt.Sprintf("%s/albmeta_local_cdrs/", dir) | |
fileStream.RemoteProtocol = sql.NullString{String: "ftp", Valid: true} | |
fileStream.RemoteCredentials = sql.NullString{String: "testuser:tiger", Valid: true} | |
fileStream.RemoteServer = sql.NullString{String: remotePath, Valid: true} | |
fileStream.RemotePath = sql.NullString{String: remotePath, Valid: true} | |
fileStream.FileRegex = sql.NullString{String: "/(.*).cdr$/", Valid: true} | |
fileStream.LocalPath = sql.NullString{String: localPath, Valid: true} | |
fileStream.IsCompressed = false | |
fileStream.Type = sql.NullString{String: "Cdr_AlbaMeta", Valid: true} | |
fileStream.IsLoaded = true | |
fileStream.IsDownloaded = true | |
fileStream.UncompressedRegex = sql.NullString{String: "", Valid: true} | |
fmt.Println("... ") | |
fileStream.CreatedDate = sql.NullString{String: time.Now().Format("2006-01-02 15:04:05"), Valid: true} | |
err := DB.Table("file_stream").Create(&fileStream).Error | |
if err != nil { | |
log.Fatal(err) | |
} | |
service := Find("AlbMeta") | |
service.SetStreamId(fileStream.FileStreamID) | |
err = service.Run("NVX00710.cdr") | |
if err != nil { | |
log.Fatal(err) | |
} | |
var count int | |
DB.Table("file_tracker").Where("file_stream_id = ?", fileStream.FileStreamID).Count(&count) | |
if count != 1 { | |
log.Fatalf("we got a different value other than %d", 1) | |
} | |
} | |
func assertionRun2() { | |
fileStream := FileStreamer{} | |
dir, _ := os.Getwd() | |
remotePath := fmt.Sprintf("%s/telus_remote_cdrs/", dir) | |
localPath := fmt.Sprintf("%s/telus_local_cdrs/", dir) | |
fileStream.RemoteProtocol = sql.NullString{String: "ftp", Valid: true} | |
fileStream.RemoteCredentials = sql.NullString{String: "testuser:tiger", Valid: true} | |
fileStream.RemoteServer = sql.NullString{String: remotePath, Valid: true} | |
fileStream.RemotePath = sql.NullString{String: remotePath, Valid: true} | |
fileStream.FileRegex = sql.NullString{String: "/(.*).cdr$/", Valid: true} | |
fileStream.LocalPath = sql.NullString{String: localPath, Valid: true} | |
fileStream.IsCompressed = false | |
fileStream.Type = sql.NullString{String: "Cdr_Telus26", Valid: true} | |
fileStream.IsLoaded = true | |
fileStream.IsDownloaded = true | |
fileStream.UncompressedRegex = sql.NullString{String: "", Valid: true} | |
fmt.Println("... ") | |
fileStream.CreatedDate = sql.NullString{String: time.Now().Format("2006-01-02 15:04:05"), Valid: true} | |
err := DB.Table("file_stream").Create(&fileStream).Error | |
if err != nil { | |
log.Fatal(err) | |
} | |
service := Find("Telus") | |
service.SetStreamId(fileStream.FileStreamID) | |
err = service.Run("MTU00710.cdr") | |
if err != nil { | |
log.Fatal(err) | |
} | |
var count int | |
DB.Table("file_tracker").Where("file_stream_id = ?", fileStream.FileStreamID).Count(&count) | |
if count != 1 { | |
log.Fatalf("we got a different value other than %d", 1) | |
} | |
} | |
type Base struct { | |
FileStreamID uint64 | |
status string | |
d Downloader | |
} | |
func (b *Base) SetStreamId(id uint64) { | |
b.FileStreamID = id | |
} | |
func (b *Base) Run(name string) error { | |
// Do some processing .. | |
// | |
// b.d.SetProtocol() | |
// | |
fmt.Println("Inside Run") | |
fileTracker, err := FetchFileTracker(name, b.FileStreamID) | |
fileTracker.DownloadStartDate = time.Now().Format("2006-01-02 15:04:05") | |
fileTracker.DownloadEndDate = time.Now().Add(10 * time.Second).Format("2006-01-02 15:04:05") | |
if err != nil { | |
log.Fatalf("Tracker insert error %s", err) | |
} | |
fmt.Println("Outside ") | |
// add some more value to fileTracker | |
return StoreFileTracker(&fileTracker) | |
} | |
type Downloader interface { | |
Name() string | |
SetProtocol() error | |
} | |
type telus struct { | |
} | |
func (t *telus) Name() string { | |
return "telus" | |
} | |
func (t *telus) SetProtocol() error { | |
// Do protocol processing | |
return nil | |
} | |
type albmeta struct { | |
} | |
func (t *albmeta) Name() string { | |
return "albmeta" | |
} | |
func (t *albmeta) SetProtocol() error { | |
// Do protocol processing | |
return nil | |
} | |
func Find(name string) *Base { | |
switch name { | |
case "Telus": | |
return &Base{d: &telus{}, status: "running"} | |
case "AlbMeta": | |
return &Base{d: &albmeta{}, status: "running"} | |
} | |
return nil | |
} | |
func StoreFileTracker(fileTracker *FileTracker) error { | |
return DB.Table("file_tracker").Create(fileTracker).Error | |
} | |
func FetchFileTracker(name string, id uint64) (FileTracker, error) { | |
var fileTracker FileTracker | |
db := DB.Table("file_tracker").Where("file_stream_id = ?", id).Where("file_name = ?", name).FirstOrInit(&fileTracker) | |
if db.Error == nil { | |
fileTracker.FileStreamId = id | |
fileTracker.FileName = name | |
} | |
return fileTracker, db.Error | |
} | |
func main() { | |
connect() | |
var wg sync.WaitGroup | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
assertionRun() | |
} | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
assertionRun2() | |
} | |
}() | |
fmt.Println("Main") | |
wg.Wait() | |
} |
Author
meetme2meat
commented
Jul 14, 2020
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment