Commit 51c461b5 authored by Rustam Lotsmanenko (EPAM)'s avatar Rustam Lotsmanenko (EPAM)
Browse files

GONRG-693 POC

Refactoring , moved SchedulerImpl to core
parent e434a83d
......@@ -2,9 +2,9 @@ package org.opengroup.osdu.backup.api;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.Backups;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.model.BackupImportRequest;
import org.opengroup.osdu.backup.model.BackupsResponse;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
import org.opengroup.osdu.backup.provider.interfaces.BackupService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
......@@ -24,25 +24,25 @@ public class BackupApi {
private BackupService backupService;
@PostMapping("/submitScheduledTask")
public ResponseEntity<String> submitScheduledTask(@RequestBody SubmitTask submitTask) {
backupService.submitScheduledTask(submitTask);
public ResponseEntity<String> submitScheduledTask(@RequestBody SubmitTaskRequest submitTaskRequest) {
backupService.submitScheduledTask(submitTaskRequest);
return new ResponseEntity<String>("Task submitted", HttpStatus.OK);
}
@PostMapping("/submitImport")
public ResponseEntity<String> submitImport(@RequestBody Backup backup) {
public ResponseEntity<String> submitImport(@RequestBody BackupImportRequest backup) {
backupService.submitImportRequest(backup);
return new ResponseEntity<String>("Import request submitted", HttpStatus.OK);
}
@GetMapping("/listSchedules")
public ResponseEntity<List<SubmitTask>> listSchedules() {
return new ResponseEntity<List<SubmitTask>>(backupService.listSchedules(), HttpStatus.OK);
public ResponseEntity<List<SubmitTaskRequest>> listSchedules() {
return new ResponseEntity<List<SubmitTaskRequest>>(backupService.listSchedules(), HttpStatus.OK);
}
@GetMapping("/listBackups")
public ResponseEntity<Backups> listBackups() {
return new ResponseEntity<Backups>(backupService.listBackups(), HttpStatus.OK);
public ResponseEntity<BackupsResponse> listBackups() {
return new ResponseEntity<BackupsResponse>(backupService.listBackups(), HttpStatus.OK);
}
}
package org.opengroup.osdu.backup.model;
public enum AssetType {
}
......@@ -9,7 +9,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Backup {
public class BackupImportRequest {
private String backupPath;
......
......@@ -11,7 +11,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Backups {
public class BackupsResponse {
HashMap<String, HashMap<String, List<String>>> backupsMap;
......
package org.opengroup.osdu.backup.model;
public class ImportRequest {
}
package org.opengroup.osdu.backup.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SubmitRequest {
@JsonProperty("AssetType")
String dataType;
@JsonProperty("Context")
Map<String, Object> context;
}
package org.opengroup.osdu.backup.model;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
......@@ -12,18 +11,18 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
@Builder
@EqualsAndHashCode(of = {"namespace", "kind"})
public class SubmitTask {
public class SubmitTaskRequest {
private String assetType;
private String namespace;
private String kind;
private Date date;
private String backupPath;
private int backupPeriod;//1
private int backupPeriod;
private int status;
private boolean active;
}
package org.opengroup.osdu.backup.provider.interfaces;
import java.util.List;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.Backups;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.model.BackupImportRequest;
import org.opengroup.osdu.backup.model.BackupsResponse;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
public interface BackupRepository {
void exportBackup(SubmitTask submitTask);
void exportBackup(SubmitTaskRequest submitTaskRequest);
void importBackup(Backup backup);
void importBackup(BackupImportRequest backup);
List<SubmitTask> listBackupSchedules();
List<SubmitTaskRequest> listBackupSchedules();
Backups listAvailableBackups();
BackupsResponse listAvailableBackups();
}
package org.opengroup.osdu.backup.provider.interfaces;
import java.util.List;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.Backups;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.model.BackupImportRequest;
import org.opengroup.osdu.backup.model.BackupsResponse;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
public interface BackupService {
void submitScheduledTask(SubmitTask submitTask);
void submitScheduledTask(SubmitTaskRequest submitTaskRequest);
void submitImportRequest(Backup backup);
void submitImportRequest(BackupImportRequest backup);
List<SubmitTask> listSchedules();
List<SubmitTaskRequest> listSchedules();
Backups listBackups();
BackupsResponse listBackups();
}
package org.opengroup.osdu.backup.provider.gcp.schedulers;
package org.opengroup.osdu.backup.shedulers;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
import org.opengroup.osdu.backup.provider.interfaces.BackupRepository;
public class BackupScheduledTask implements Runnable {
private BackupRepository backupRepository;
private final BackupRepository backupRepository;
private SubmitTask submitTask;
private final SubmitTaskRequest submitTaskRequest;
public BackupScheduledTask(BackupRepository backupRepository,
SubmitTask submitTask) {
SubmitTaskRequest submitTaskRequest) {
this.backupRepository = backupRepository;
this.submitTask = submitTask;
this.submitTaskRequest = submitTaskRequest;
}
@Override
public void run() {
backupRepository.exportBackup(submitTask);
backupRepository.exportBackup(submitTaskRequest);
}
}
package org.opengroup.osdu.backup.shedulers;
import org.opengroup.osdu.backup.model.SubmitTask;
public interface Scheduler {
void addTaskToScheduler(SubmitTask submitTask);
void removeTaskFromScheduler(SubmitTask submitTask);
void taskStatusChanged(SubmitTask submitTask);
}
package org.opengroup.osdu.backup.shedulers;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
public interface SchedulerService {
void addTaskToScheduler(SubmitTaskRequest submitTaskRequest);
void removeTaskFromScheduler(SubmitTaskRequest submitTaskRequest);
void taskStatusChanged(SubmitTaskRequest submitTaskRequest);
}
package org.opengroup.osdu.backup.provider.gcp.schedulers;
package org.opengroup.osdu.backup.shedulers.impl;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
import org.opengroup.osdu.backup.provider.interfaces.BackupRepository;
import org.opengroup.osdu.backup.shedulers.Scheduler;
import org.opengroup.osdu.backup.shedulers.BackupScheduledTask;
import org.opengroup.osdu.backup.shedulers.SchedulerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Service;
@Service
public class SchedulerImpl implements Scheduler {
public class SchedulerServiceImpl implements SchedulerService {
private final Map<SubmitTaskRequest, ScheduledFuture> taskMap = new HashMap<>();
@Autowired
private BackupRepository backupRepository;
private Map<SubmitTask, ScheduledFuture> taskMap = new HashMap<>();
private ThreadPoolTaskScheduler poolTaskScheduler;
@PostConstruct
public void initThreadPool() {
poolTaskScheduler = new ThreadPoolTaskScheduler();
poolTaskScheduler.setPoolSize(5);
poolTaskScheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
poolTaskScheduler.setRemoveOnCancelPolicy(true);
poolTaskScheduler.initialize();
}
@Override
public void addTaskToScheduler(SubmitTask submitTask) {
public void addTaskToScheduler(SubmitTaskRequest submitTaskRequest) {
PeriodicTrigger periodicTrigger
= new PeriodicTrigger(submitTask.getBackupPeriod(), TimeUnit.MINUTES);
= new PeriodicTrigger(submitTaskRequest.getBackupPeriod(), TimeUnit.MINUTES);
periodicTrigger.setFixedRate(true);
BackupScheduledTask backupScheduledTask = new BackupScheduledTask(backupRepository, submitTask);
BackupScheduledTask backupScheduledTask = new BackupScheduledTask(backupRepository, submitTaskRequest);
ScheduledFuture<?> schedule = poolTaskScheduler.schedule(backupScheduledTask, periodicTrigger);
taskMap.put(submitTask, schedule);
taskMap.put(submitTaskRequest, schedule);
}
@Override
public void removeTaskFromScheduler(SubmitTask submitTask) {
taskMap.get(submitTask).cancel(false);
public void removeTaskFromScheduler(SubmitTaskRequest submitTaskRequest) {
taskMap.get(submitTaskRequest).cancel(false);
}
@Override
public void taskStatusChanged(SubmitTask submitTask) {
if (submitTask.getStatus() == 0) {
taskMap.get(submitTask).cancel(false);
public void taskStatusChanged(SubmitTaskRequest submitTaskRequest) {
if (!submitTaskRequest.isActive()) {
taskMap.get(submitTaskRequest).cancel(false);
} else {
addTaskToScheduler(submitTask);
addTaskToScheduler(submitTaskRequest);
}
}
@PostConstruct
public void initThreadPool() {
poolTaskScheduler = new ThreadPoolTaskScheduler();
poolTaskScheduler.setPoolSize(5);
poolTaskScheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
poolTaskScheduler.setRemoveOnCancelPolicy(true);
poolTaskScheduler.initialize();
}
}
......@@ -67,11 +67,7 @@
<artifactId>google-cloud-datastore</artifactId>
<version>1.86.0</version>
</dependency>
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>javax.persistence-api</artifactId>
<version>2.2</version>
</dependency>
<!-- Testing packages -->
<dependency>
<groupId>junit</groupId>
......
package org.opengroup.osdu.backup.provider.gcp;/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.backup.provider.gcp;
public interface GcpPackageMarker {
......
package org.opengroup.osdu.backup.provider.gcp.conf;
package org.opengroup.osdu.backup.provider.gcp.config;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
......@@ -9,7 +9,7 @@ import org.springframework.context.annotation.Configuration;
@Configuration
@EnableDatastoreRepositories(basePackageClasses = GcpPackageMarker.class)
public class DatastoreConfiguration {
public class GCPConfiguration {
@Bean
public Storage googleCloudStorage() {
......
......@@ -5,7 +5,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
import org.springframework.cloud.gcp.data.datastore.core.mapping.Entity;
import org.springframework.cloud.gcp.data.datastore.core.mapping.Field;
import org.springframework.data.annotation.Id;
......@@ -19,7 +19,10 @@ import org.springframework.data.annotation.Id;
public class GCPSubmitTask {
@Id
Long id;
private Long id;
@Field(name = "AssetType")
private String assetType;
@Field(name = "Namespace")
private String namespace;
......@@ -27,30 +30,32 @@ public class GCPSubmitTask {
@Field(name = "Kind")
private String kind;
@Field(name = "period")
@Field(name = "Period")
private int period;
@Field(name = "Path")
private String backupPath;
@Field(name = "Status")
private int status;
@Field(name = "Active")
private boolean active;
public GCPSubmitTask(SubmitTask submitTask) {
this.kind = submitTask.getKind();
this.namespace = submitTask.getNamespace();
this.period = submitTask.getBackupPeriod();
this.backupPath = submitTask.getBackupPath();
this.status = submitTask.getStatus();
public GCPSubmitTask(SubmitTaskRequest submitTaskRequest) {
this.assetType = submitTaskRequest.getAssetType();
this.kind = submitTaskRequest.getKind();
this.namespace = submitTaskRequest.getNamespace();
this.period = submitTaskRequest.getBackupPeriod();
this.backupPath = submitTaskRequest.getBackupPath();
this.active = submitTaskRequest.isActive();
}
public SubmitTask toSubmit() {
return SubmitTask.builder()
public SubmitTaskRequest toSubmit() {
return SubmitTaskRequest.builder()
.assetType(this.getAssetType())
.kind(this.kind)
.namespace(this.namespace)
.backupPath(this.backupPath)
.backupPeriod(this.period)
.status(this.status)
.active(this.active)
.build();
}
......
......@@ -23,14 +23,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.Backups;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.model.BackupImportRequest;
import org.opengroup.osdu.backup.model.BackupsResponse;
import org.opengroup.osdu.backup.model.SubmitTaskRequest;
import org.opengroup.osdu.backup.provider.gcp.model.GCPSubmitTask;
import org.opengroup.osdu.backup.provider.interfaces.BackupRepository;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -42,7 +43,11 @@ import org.springframework.stereotype.Repository;
@RequiredArgsConstructor
public class BackupRepositoryImpl implements BackupRepository {
final Storage storage;
private static final String OUTPUT_URL_PREFIX = "outputUrlPrefix";
private JsonFactory JSON_FACTORY = new JacksonFactory();
private final Storage storage;
@Value("${gcp.backup.bucket}")
private String backupBucket;
......@@ -50,56 +55,50 @@ public class BackupRepositoryImpl implements BackupRepository {
@Value("${gcp.project.id}")
private String projectID;
private JsonFactory JSON_FACTORY = new JacksonFactory();
@Autowired
private TaskRepository repository;
private Datastore datastore;
@Override
public void exportBackup(SubmitTask submitTask) {
public void exportBackup(SubmitTaskRequest submitTaskRequest) {
log.log(Level.INFO, "Starting backup export for namespace:{0} kind:{1} export",
new Object[]{submitTask.getNamespace(), submitTask.getKind()});
new Object[]{submitTaskRequest.getNamespace(), submitTaskRequest.getKind()});
try {
GoogleDatastoreAdminV1EntityFilter googleDatastoreAdminV1EntityFilter = new GoogleDatastoreAdminV1EntityFilter();
googleDatastoreAdminV1EntityFilter.setKinds(Collections.singletonList(submitTask.getKind()));
googleDatastoreAdminV1EntityFilter.setNamespaceIds(Collections.singletonList(submitTask.getNamespace()));
GoogleDatastoreAdminV1ExportEntitiesRequest context = new GoogleDatastoreAdminV1ExportEntitiesRequest();
context.setEntityFilter(googleDatastoreAdminV1EntityFilter);
context.setOutputUrlPrefix(getOutputUrlPrefix(submitTask));
GoogleLongrunningOperation exportTaskCreationResponse = getDatastore().projects().export(projectID, context)
GoogleDatastoreAdminV1ExportEntitiesRequest context = buildExportEntitiesRequest(submitTaskRequest);
Datastore datastore = getDatastore();
GoogleLongrunningOperation exportTaskCreationResponse = datastore.projects().export(projectID, context)
.execute();
Operations dataStoreOperations = getDatastore().projects().operations();
Operations dataStoreOperations = datastore.projects().operations();
GoogleLongrunningOperation exportTaskResultResponse = dataStoreOperations
.get(exportTaskCreationResponse.getName()).execute();
Object outputUrlPrefix = exportTaskResultResponse.getMetadata().get("outputUrlPrefix");
Object outputUrlPrefix = exportTaskResultResponse.getMetadata().get(OUTPUT_URL_PREFIX);
log.log(Level.INFO, "Export status done:{0} backup path:{1} export",
new Object[]{exportTaskResultResponse.getDone(), outputUrlPrefix});
GCPSubmitTask oneByKindAndNamespace = repository
.findOneByKindAndNamespace(submitTask.getKind(), submitTask.getNamespace());
.findOneByKindAndNamespace(submitTaskRequest.getKind(), submitTaskRequest.getNamespace());
oneByKindAndNamespace.setBackupPath(outputUrlPrefix.toString());
repository.save(oneByKindAndNamespace);
} catch (IOException e) {
e.printStackTrace();
// TODO OSDU
}
}
@Override
public void importBackup(Backup backup) {
public void importBackup(BackupImportRequest backup) {
try {
GoogleDatastoreAdminV1ImportEntitiesRequest importRequest = new GoogleDatastoreAdminV1ImportEntitiesRequest();
importRequest.setInputUrl(backup.getBackupPath());
......@@ -110,7 +109,7 @@ public class BackupRepositoryImpl implements BackupRepository {
}
@Override
public List<SubmitTask> listBackupSchedules() {
public List<SubmitTaskRequest> listBackupSchedules() {
Iterable<GCPSubmitTask> gcpSubmitTasks = repository.findAll();
List<GCPSubmitTask> collect = StreamSupport.stream(gcpSubmitTasks.spliterator(), false)
.collect(Collectors.toList());
......@@ -118,8 +117,8 @@ public class BackupRepositoryImpl implements BackupRepository {
}
@Override
public Backups listAvailableBackups() {
Backups backups = new Backups();
public BackupsResponse listAvailableBackups() {
BackupsResponse backupsResponse = new BackupsResponse();
Page<Blob> list = storage.list(backupBucket);