Commit d83e0e8d authored by Rustam Lotsmanenko (EPAM)'s avatar Rustam Lotsmanenko (EPAM)
Browse files

GONRG-693 POC

Task Scheduling draft
parent d868bec2
package org.opengroup.osdu.backup.api;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.backup.model.ImportRequest;
import org.opengroup.osdu.backup.model.SubmitRequest;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.provider.interfaces.BackupService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
......@@ -21,24 +22,26 @@ public class BackupApi {
@Autowired
private BackupService backupService;
@PostMapping("/submitTask/")
public ResponseEntity<String> submitTask(@RequestBody SubmitRequest submitRequest) {
backupService.submitRequest(submitRequest);
return new ResponseEntity<String>("Backup service is alive", HttpStatus.OK);
}
@PostMapping("/submitTask2/")
public ResponseEntity<String> submitTask2(@RequestBody SubmitTask submitTask) {
backupService.submitTask(submitTask);
@PostMapping("/submitScheduledTask")
public ResponseEntity<String> submitScheduledTask(@RequestBody SubmitTask submitTask) {
backupService.submitScheduledTask(submitTask);
return new ResponseEntity<String>("Task submitted", HttpStatus.OK);
}
@PostMapping("/submitImport")
public ResponseEntity<String> submitImport(@RequestBody ImportRequest importRequest) {
backupService.submitImportRequest(importRequest);
public ResponseEntity<String> submitImport(@RequestBody Backup backup) {
backupService.submitImportRequest(backup);
return new ResponseEntity<String>("Import request submitted", HttpStatus.OK);
}
@GetMapping("/listSchedules")
public ResponseEntity<List<SubmitTask>> listSchedules() {
return new ResponseEntity<List<SubmitTask>>(backupService.listSchedules(), HttpStatus.OK);
}
@GetMapping("/listBackups")
public ResponseEntity<List<Backup>> listBackups() {
return new ResponseEntity<List<Backup>>(backupService.listBackups(), HttpStatus.OK);
}
}
package org.opengroup.osdu.backup.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Backup {
private String backupPath;
}
......@@ -4,12 +4,14 @@ import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@EqualsAndHashCode(of = {"namespace", "kind"})
public class SubmitTask {
private String namespace;
......
package org.opengroup.osdu.backup.provider.interfaces;
import java.util.List;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.SubmitTask;
public interface BackupRepository {
void exportBackup(String namespace, String kind);
void exportBackup(SubmitTask submitTask);
void importBackup(Backup backup);
List<SubmitTask> listBackupSchedules();
void importBackup(String backupLocationPath);
List<Backup> listAvailableBackups();
}
package org.opengroup.osdu.backup.provider.interfaces;
import org.opengroup.osdu.backup.model.ImportRequest;
import org.opengroup.osdu.backup.model.SubmitRequest;
import java.util.List;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.SubmitTask;
public interface BackupService {
void submitRequest(SubmitRequest submitRequest);
void submitScheduledTask(SubmitTask submitTask);
void submitTask(SubmitTask submitTask);
void submitImportRequest(ImportRequest importRequest);
void submitImportRequest(Backup backup);
List<SubmitTask> listSchedules();
List<Backup> listBackups();
}
......@@ -6,4 +6,7 @@ public interface Scheduler {
void addTaskToScheduler(SubmitTask submitTask);
void removeTaskFromScheduler(SubmitTask submitTask);
void taskStatusChanged(SubmitTask submitTask);
}
......@@ -46,6 +46,12 @@
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-storage</artifactId>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-datastore</artifactId>
......@@ -62,11 +68,10 @@
<version>1.86.0</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.3.1.Final</version>
<groupId>javax.persistence</groupId>
<artifactId>javax.persistence-api</artifactId>
<version>2.2</version>
</dependency>
<!-- Testing packages -->
<dependency>
<groupId>junit</groupId>
......
package org.opengroup.osdu.backup.provider.gcp.conf;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.opengroup.osdu.backup.provider.gcp.GcpPackageMarker;
import org.springframework.cloud.gcp.data.datastore.repository.config.EnableDatastoreRepositories;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableDatastoreRepositories(basePackageClasses = GcpPackageMarker.class)
public class DatastoreConfiguration {
@Bean
public Storage googleCloudStorage() {
return StorageOptions.getDefaultInstance().getService();
}
}
package org.opengroup.osdu.backup.provider.gcp.mapper;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.provider.gcp.model.GCPSubmitTask;
import org.springframework.stereotype.Service;
@Service
public class SubmitTaskMapper {
public GCPSubmitTask toGcpSubmitTask(SubmitTask submitTask) {
return GCPSubmitTask.builder()
.kind(submitTask.getKind())
.namespace(submitTask.getNamespace())
.period(submitTask.getBackupPeriod())
.build();
}
}
......@@ -3,7 +3,9 @@ package org.opengroup.osdu.backup.provider.gcp.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.springframework.cloud.gcp.data.datastore.core.mapping.Entity;
import org.springframework.cloud.gcp.data.datastore.core.mapping.Field;
import org.springframework.data.annotation.Id;
......@@ -13,6 +15,7 @@ import org.springframework.data.annotation.Id;
@NoArgsConstructor
@AllArgsConstructor
@Builder
@EqualsAndHashCode(of = {"namespace", "kind"})
public class GCPSubmitTask {
@Id
......@@ -33,4 +36,22 @@ public class GCPSubmitTask {
@Field(name = "Status")
private int status;
public GCPSubmitTask(SubmitTask submitTask) {
this.kind = submitTask.getKind();
this.namespace = submitTask.getNamespace();
this.period = submitTask.getBackupPeriod();
this.backupPath = submitTask.getBackupPath();
this.status = submitTask.getStatus();
}
public SubmitTask toSubmit() {
return SubmitTask.builder()
.kind(this.kind)
.namespace(this.namespace)
.backupPath(this.backupPath)
.backupPeriod(this.period)
.status(this.status)
.build();
}
}
......@@ -4,7 +4,9 @@ package org.opengroup.osdu.backup.provider.gcp.repository;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.gax.paging.Page;
import com.google.api.services.datastore.v1.Datastore;
import com.google.api.services.datastore.v1.Datastore.Projects.Operations;
import com.google.api.services.datastore.v1.model.GoogleDatastoreAdminV1EntityFilter;
import com.google.api.services.datastore.v1.model.GoogleDatastoreAdminV1ExportEntitiesRequest;
import com.google.api.services.datastore.v1.model.GoogleDatastoreAdminV1ImportEntitiesRequest;
......@@ -12,14 +14,21 @@ import com.google.api.services.datastore.v1.model.GoogleLongrunningOperation;
import com.google.api.services.iam.v1.IamScopes;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.provider.gcp.model.GCPSubmitTask;
import org.opengroup.osdu.backup.provider.interfaces.BackupRepository;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -28,8 +37,11 @@ import org.springframework.stereotype.Repository;
@Log
@Repository
@RequiredArgsConstructor
public class BackupRepositoryImpl implements BackupRepository {
final Storage storage;
@Value("${gcp.backup.bucket}")
private String backupBucket;
......@@ -45,29 +57,38 @@ public class BackupRepositoryImpl implements BackupRepository {
@Override
public void exportBackup(String namespace, String kind) {
public void exportBackup(SubmitTask submitTask) {
log.log(Level.INFO, "Starting backup export for namespace:{0} kind:{1} export",
new Object[]{namespace, kind});
new Object[]{submitTask.getNamespace(), submitTask.getKind()});
try {
GoogleDatastoreAdminV1EntityFilter googleDatastoreAdminV1EntityFilter = new GoogleDatastoreAdminV1EntityFilter();
googleDatastoreAdminV1EntityFilter.setKinds(Collections.singletonList(kind));
googleDatastoreAdminV1EntityFilter.setNamespaceIds(Collections.singletonList(namespace));
googleDatastoreAdminV1EntityFilter.setKinds(Collections.singletonList(submitTask.getKind()));
googleDatastoreAdminV1EntityFilter.setNamespaceIds(Collections.singletonList(submitTask.getNamespace()));
GoogleDatastoreAdminV1ExportEntitiesRequest context = new GoogleDatastoreAdminV1ExportEntitiesRequest();
context.setEntityFilter(googleDatastoreAdminV1EntityFilter);
context.setOutputUrlPrefix(backupBucket);
GoogleLongrunningOperation execute = getDatastore().projects().export(projectID, context).execute();
context.setOutputUrlPrefix("gs://" + backupBucket);
GoogleLongrunningOperation exportTaskCreationResponse = getDatastore().projects().export(projectID, context)
.execute();
Operations dataStoreOperations = getDatastore().projects().operations();
log.log(Level.INFO, "Export status:{0} response:{1} export",
new Object[]{execute.getDone(), execute.getResponse()});
GoogleLongrunningOperation exportTaskResultResponse = dataStoreOperations
.get(exportTaskCreationResponse.getName()).execute();
Map<String, Object> response = execute.getResponse();
Object outputUrlPrefix = exportTaskResultResponse.getMetadata().get("outputUrlPrefix");
GCPSubmitTask submittedTask = GCPSubmitTask.builder().kind(kind).namespace(namespace)
.backupPath(response.get("path").toString()).build();
log.log(Level.INFO, "Export status done:{0} backup path:{1} export",
new Object[]{exportTaskResultResponse.getDone(), outputUrlPrefix});
repository.save(submittedTask);
GCPSubmitTask oneByKindAndNamespace = repository
.findOneByKindAndNamespace(submitTask.getKind(), submitTask.getNamespace());
oneByKindAndNamespace.setBackupPath(outputUrlPrefix.toString());
repository.save(oneByKindAndNamespace);
} catch (IOException e) {
e.printStackTrace();
......@@ -75,16 +96,35 @@ public class BackupRepositoryImpl implements BackupRepository {
}
@Override
public void importBackup(String backupLocationPath) {
public void importBackup(Backup backup) {
try {
GoogleDatastoreAdminV1ImportEntitiesRequest importRequest = new GoogleDatastoreAdminV1ImportEntitiesRequest();
importRequest.setInputUrl(backupLocationPath);
importRequest.setInputUrl(backup.getBackupPath());
getDatastore().projects().datastoreImport(projectID, importRequest).execute();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
@Override
public List<SubmitTask> listBackupSchedules() {
Iterable<GCPSubmitTask> gcpSubmitTasks = repository.findAll();
List<GCPSubmitTask> collect = StreamSupport.stream(gcpSubmitTasks.spliterator(), false)
.collect(Collectors.toList());
return collect.stream().map(GCPSubmitTask::toSubmit).collect(Collectors.toList());
}
@Override
public List<Backup> listAvailableBackups() {
ArrayList<Backup> backups = new ArrayList<>();
Page<Blob> list = storage.list(backupBucket);
for (Blob summary : list.getValues()) {
BlobId blobId = summary.getBlobId();
backups.add(new Backup("gs://" + blobId.getBucket() + "/" + blobId.getName()));
}
return backups;
}
private Datastore getDatastore() {
if (datastore == null) {
try {
......
package org.opengroup.osdu.backup.provider.gcp.repository;
import org.opengroup.osdu.backup.model.ImportRequest;
import org.opengroup.osdu.backup.model.SubmitRequest;
import java.util.List;
import javax.annotation.PostConstruct;
import org.opengroup.osdu.backup.model.Backup;
import org.opengroup.osdu.backup.model.SubmitTask;
import org.opengroup.osdu.backup.provider.gcp.mapper.SubmitTaskMapper;
import org.opengroup.osdu.backup.provider.gcp.model.GCPSubmitTask;
import org.opengroup.osdu.backup.provider.interfaces.BackupRepository;
import org.opengroup.osdu.backup.provider.interfaces.BackupService;
......@@ -21,27 +21,45 @@ public class BackupServiceImpl implements BackupService {
@Autowired
private TaskRepository taskRepository;
@Autowired
private SubmitTaskMapper submitTaskMapper;
@Autowired
private Scheduler scheduler;
@Override
public void submitRequest(SubmitRequest submitRequest) {
backupRepository.exportBackup(
"opendes", "BackupTestKind");
public void submitScheduledTask(SubmitTask submitTask) {
GCPSubmitTask taskExist = taskRepository
.findOneByKindAndNamespace(submitTask.getKind(), submitTask.getNamespace());
if (taskExist != null) {
if (submitTask.getStatus() != taskExist.getStatus()) {
scheduler.taskStatusChanged(submitTask);
taskExist.setStatus(submitTask.getStatus());
taskRepository.save(taskExist);
}
} else {
scheduler.addTaskToScheduler(submitTask);
taskRepository.save(new GCPSubmitTask(submitTask));
}
}
@Override
public void submitTask(SubmitTask submitTask) {
scheduler.addTaskToScheduler(submitTask);
GCPSubmitTask gcpSubmitTask = submitTaskMapper.toGcpSubmitTask(submitTask);
taskRepository.save(gcpSubmitTask);
public void submitImportRequest(Backup backup) {
backupRepository.importBackup(backup);
}
@Override
public void submitImportRequest(ImportRequest importRequest) {
backupRepository.importBackup("");
public List<SubmitTask> listSchedules() {
return backupRepository.listBackupSchedules();
}
@Override
public List<Backup> listBackups() {
return backupRepository.listAvailableBackups();
}
@PostConstruct
public void startUpSchedulers() {
List<SubmitTask> submitTaskList = backupRepository.listBackupSchedules();
submitTaskList.stream().filter(t -> t.getStatus() == 1).forEach(t -> scheduler.addTaskToScheduler(t));
}
}
......@@ -7,4 +7,5 @@ import org.springframework.stereotype.Repository;
@Repository
public interface TaskRepository extends DatastoreRepository<GCPSubmitTask, Long> {
public GCPSubmitTask findOneByKindAndNamespace(String kind, String namespace);
}
......@@ -17,6 +17,6 @@ public class BackupScheduledTask implements Runnable {
@Override
public void run() {
backupRepository.exportBackup(submitTask.getNamespace(), submitTask.getKind());
backupRepository.exportBackup(submitTask);
}
}
package org.opengroup.osdu.backup.provider.gcp.schedulers;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.opengroup.osdu.backup.model.SubmitTask;
......@@ -16,8 +19,9 @@ public class SchedulerImpl implements Scheduler {
@Autowired
private BackupRepository backupRepository;
private ThreadPoolTaskScheduler poolTaskScheduler;
private Map<SubmitTask, ScheduledFuture> taskMap = new HashMap<>();
private ThreadPoolTaskScheduler poolTaskScheduler;
@Override
public void addTaskToScheduler(SubmitTask submitTask) {
......@@ -25,7 +29,22 @@ public class SchedulerImpl implements Scheduler {
= new PeriodicTrigger(submitTask.getBackupPeriod(), TimeUnit.MINUTES);
periodicTrigger.setFixedRate(true);
BackupScheduledTask backupScheduledTask = new BackupScheduledTask(backupRepository, submitTask);
poolTaskScheduler.schedule(backupScheduledTask, periodicTrigger);
ScheduledFuture<?> schedule = poolTaskScheduler.schedule(backupScheduledTask, periodicTrigger);
taskMap.put(submitTask, schedule);
}
@Override
public void removeTaskFromScheduler(SubmitTask submitTask) {
taskMap.get(submitTask).cancel(false);
}
@Override
public void taskStatusChanged(SubmitTask submitTask) {
if (submitTask.getStatus() == 0) {
taskMap.get(submitTask).cancel(false);
} else {
addTaskToScheduler(submitTask);
}
}
@PostConstruct
......@@ -33,6 +52,7 @@ public class SchedulerImpl implements Scheduler {
poolTaskScheduler = new ThreadPoolTaskScheduler();
poolTaskScheduler.setPoolSize(5);
poolTaskScheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
poolTaskScheduler.setRemoveOnCancelPolicy(true);
poolTaskScheduler.initialize();
}
}
logging.level.org.springframework=INFO
logging.level.org.springframework.boot.autoconfigure=ERROR
spring.cloud.gcp.datastore.namespace=opendes
gcp.backup.bucket=gs://osdu-cicd-epam-backup-service
gcp.backup.bucket=osdu-cicd-epam-backup-service
gcp.project.id=osdu-cicd-epam
server.servlet.contextPath=/backup/v1
server.port=8080
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment